Flink 版本 1.8.0
采用RocksDBStateBackend 程序上线后,运行过程中,经常会有如下异常: 2019-08-24 00:34:05,192 WARN org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Fail to subsume the old checkpoint. org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory is not empty at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) at org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2019-08-24 02:58:03,572 WARN org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Fail to subsume the old checkpoint. org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory is not empty at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) at org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2019-08-24 03:21:00,784 WARN org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Fail to subsume the old checkpoint. org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51166': Directory is not empty at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) at org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 请教下,异常的原因,可能是啥。 |
Checkpoint 设置如下:
env.setStateBackend((StateBackend) new RocksDBStateBackend(checkpointDirectory,true)); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointInterval(30 * 1000); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 王佩 <[hidden email]> 于2019年8月24日周六 下午4:49写道: > Flink 版本 1.8.0 > 采用RocksDBStateBackend > > 程序上线后,运行过程中,经常会有如下异常: > 2019-08-24 00:34:05,192 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2019-08-24 02:58:03,572 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2019-08-24 03:21:00,784 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51166': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > 请教下,异常的原因,可能是啥。 > |
In reply to this post by 王佩-2
https://issues.apache.org/jira/browse/FLINK-13856 <https://issues.apache.org/jira/browse/FLINK-13856>
可能是 s3a的问题,删除操作是不是阻塞的? 报错看起来是因删除文件夹时候前面的删除操作还没完成。文件不是空的 > 下面是被转发的邮件: > > 发件人: 王佩 <[hidden email]> > 主题: PathIsNotEmptyDirectoryException 异常 > 日期: 2019年8月24日 GMT+8 下午4:49:36 > 收件人: user-zh <[hidden email]> > 回复-收件人: [hidden email] > > Flink 版本 1.8.0 > 采用RocksDBStateBackend > > 程序上线后,运行过程中,经常会有如下异常: > 2019-08-24 00:34:05,192 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2019-08-24 02:58:03,572 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2019-08-24 03:21:00,784 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51166': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > 请教下,异常的原因,可能是啥。 |
在Flink看来,删除操作都是同一个线程内顺序执行的,S3AFileSystem.delete 接口看上去也不是异步执行的。出异常的时候,能检查一下chk-50832 目录里面还有什么么?
________________________________ From: Andrew Lin <[hidden email]> Sent: Tuesday, August 27, 2019 16:53 To: [hidden email] <[hidden email]>; [hidden email] <[hidden email]> Subject: Fwd: PathIsNotEmptyDirectoryException 异常 https://issues.apache.org/jira/browse/FLINK-13856 <https://issues.apache.org/jira/browse/FLINK-13856> 可能是 s3a的问题,删除操作是不是阻塞的? 报错看起来是因删除文件夹时候前面的删除操作还没完成。文件不是空的 > 下面是被转发的邮件: > > 发件人: 王佩 <[hidden email]> > 主题: PathIsNotEmptyDirectoryException 异常 > 日期: 2019年8月24日 GMT+8 下午4:49:36 > 收件人: user-zh <[hidden email]> > 回复-收件人: [hidden email] > > Flink 版本 1.8.0 > 采用RocksDBStateBackend > > 程序上线后,运行过程中,经常会有如下异常: > 2019-08-24 00:34:05,192 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2019-08-24 02:58:03,572 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2019-08-24 03:21:00,784 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51166': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > 请教下,异常的原因,可能是啥。 |
Free forum by Nabble | Edit this page |