任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for KeyedProcessOperator。这个问题怎么解决呢?
版本:1.10 standalone 配置信息: state.backend: rocksdb state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint state.backend.incremental: true jobmanager.execution.failover-strategy: region io.tmp.dirs: /data/flink1_10/tmp 任务的checkpoint配置: env.enableCheckpointing(2 * 60 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 日志信息: 2020-04-01 11:13:03 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.nio.file.NoSuchFileException: /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst -> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476) at java.nio.file.Files.createLink(Files.java:1086) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) ... 15 more TaskManager的报错信息: 2020-04-01 14:48:10,726 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception. java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) 2020-04-01 14:48:10,726 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from alternative (1/1), will retry while more alternatives are available. org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) ... 15 more |
Hi
Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复 从 TM 日志看像下载出错了,你可以看下 /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst 这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因 Best, Congxian chenxyz <[hidden email]> 于2020年4月1日周三 下午3:02写道: > 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for > KeyedProcessOperator。这个问题怎么解决呢? > > 版本:1.10 standalone > > 配置信息: > > state.backend: rocksdb > > state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint > > state.backend.incremental: true > > jobmanager.execution.failover-strategy: region > > io.tmp.dirs: /data/flink1_10/tmp > > > > > 任务的checkpoint配置: > > env.enableCheckpointing(2 * 60 * 1000); > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000); > > env.getCheckpointConfig().setCheckpointTimeout(60000); > > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12); > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > 日志信息: > > > > > 2020-04-01 11:13:03 > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the > 1 provided restore options. > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > > ... 9 more > > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > > ... 11 more > > Caused by: java.nio.file.NoSuchFileException: > /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst > -> > /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst > > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > > at > sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476) > > at java.nio.file.Files.createLink(Files.java:1086) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) > > ... 15 more > > > > > > > > TaskManager的报错信息: > > > > > 2020-04-01 14:48:10,726 ERROR > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - > Caught unexpected exception. > > java.io.InterruptedIOException: Interrupted while waiting for data to be > acknowledged by pipeline > > at > org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) > > at > org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) > > at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) > > at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) > > at > org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) > > at > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) > > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) > > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > > at > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) > > at > java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) > > at > java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > > at java.lang.Thread.run(Thread.java:748) > > 2020-04-01 14:48:10,726 WARN > org.apache.flink.streaming.api.operators.BackendRestorerProcedure - > Exception while restoring keyed state backend for > KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from > alternative (1/1), will retry while more alternatives are available. > > org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected > exception. > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.io.InterruptedIOException: Interrupted while waiting for > data to be acknowledged by pipeline > > at > org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) > > at > org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) > > at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) > > at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) > > at > org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) > > at > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) > > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) > > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > > at > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) > > at > java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) > > at > java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) > > ... 15 more |
Hi, 从贤,
我查看了下HDFS, /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。 在 2020-04-01 16:50:13,"Congxian Qiu" <[hidden email]> 写道: >Hi >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复 >从 TM 日志看像下载出错了,你可以看下 >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因 > >Best, >Congxian > > >chenxyz <[hidden email]> 于2020年4月1日周三 下午3:02写道: > >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for >> KeyedProcessOperator。这个问题怎么解决呢? >> >> 版本:1.10 standalone >> >> 配置信息: >> >> state.backend: rocksdb >> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint >> >> state.backend.incremental: true >> >> jobmanager.execution.failover-strategy: region >> >> io.tmp.dirs: /data/flink1_10/tmp >> >> >> >> >> 任务的checkpoint配置: >> >> env.enableCheckpointing(2 * 60 * 1000); >> >> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000); >> >> env.getCheckpointConfig().setCheckpointTimeout(60000); >> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); >> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12); >> >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> >> >> >> >> 日志信息: >> >> >> >> >> 2020-04-01 11:13:03 >> >> java.lang.Exception: Exception while creating StreamOperatorStateContext. >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed >> state backend for >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the >> 1 provided restore options. >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) >> >> ... 9 more >> >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught >> unexpected exception. >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) >> >> ... 11 more >> >> Caused by: java.nio.file.NoSuchFileException: >> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst >> -> >> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst >> >> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) >> >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) >> >> at >> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476) >> >> at java.nio.file.Files.createLink(Files.java:1086) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) >> >> ... 15 more >> >> >> >> >> >> >> >> TaskManager的报错信息: >> >> >> >> >> 2020-04-01 14:48:10,726 ERROR >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - >> Caught unexpected exception. >> >> java.io.InterruptedIOException: Interrupted while waiting for data to be >> acknowledged by pipeline >> >> at >> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) >> >> at >> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) >> >> at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) >> >> at >> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) >> >> at >> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) >> >> at >> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) >> >> at >> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) >> >> at >> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) >> >> at >> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) >> >> at >> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >> >> at >> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) >> >> at >> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> >> at java.lang.Thread.run(Thread.java:748) >> >> 2020-04-01 14:48:10,726 WARN >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure - >> Exception while restoring keyed state backend for >> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from >> alternative (1/1), will retry while more alternatives are available. >> >> org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected >> exception. >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: java.io.InterruptedIOException: Interrupted while waiting for >> data to be acknowledged by pipeline >> >> at >> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) >> >> at >> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) >> >> at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) >> >> at >> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) >> >> at >> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) >> >> at >> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) >> >> at >> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) >> >> at >> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) >> >> at >> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) >> >> at >> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >> >> at >> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) >> >> at >> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) >> >> at >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) >> >> ... 15 more |
HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint Meta 的相关信息,这个比较麻烦,可以参考
CheckpointMetadataLoadingTest 的相关测试。 我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的 outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开 debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好 Best, Congxian chenxyz <[hidden email]> 于2020年4月1日周三 下午5:18写道: > Hi, 从贤, > 我查看了下HDFS, > /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。 > > > > > > > > > 在 2020-04-01 16:50:13,"Congxian Qiu" <[hidden email]> 写道: > >Hi > >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复 > >从 TM 日志看像下载出错了,你可以看下 > > >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst > >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因 > > > >Best, > >Congxian > > > > > >chenxyz <[hidden email]> 于2020年4月1日周三 下午3:02写道: > > > >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for > >> KeyedProcessOperator。这个问题怎么解决呢? > >> > >> 版本:1.10 standalone > >> > >> 配置信息: > >> > >> state.backend: rocksdb > >> > >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint > >> > >> state.backend.incremental: true > >> > >> jobmanager.execution.failover-strategy: region > >> > >> io.tmp.dirs: /data/flink1_10/tmp > >> > >> > >> > >> > >> 任务的checkpoint配置: > >> > >> env.enableCheckpointing(2 * 60 * 1000); > >> > >> > >> > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > >> > >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000); > >> > >> env.getCheckpointConfig().setCheckpointTimeout(60000); > >> > >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > >> > >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12); > >> > >> > >> > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> > >> > >> > >> > >> 日志信息: > >> > >> > >> > >> > >> 2020-04-01 11:13:03 > >> > >> java.lang.Exception: Exception while creating > StreamOperatorStateContext. > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) > >> > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > >> > >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > >> > >> at java.lang.Thread.run(Thread.java:748) > >> > >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > >> state backend for > >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of > the > >> 1 provided restore options. > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > >> > >> ... 9 more > >> > >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: > Caught > >> unexpected exception. > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > >> > >> ... 11 more > >> > >> Caused by: java.nio.file.NoSuchFileException: > >> > /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst > >> -> > >> > /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst > >> > >> at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > >> > >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > >> > >> at > >> > sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476) > >> > >> at java.nio.file.Files.createLink(Files.java:1086) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) > >> > >> ... 15 more > >> > >> > >> > >> > >> > >> > >> > >> TaskManager的报错信息: > >> > >> > >> > >> > >> 2020-04-01 14:48:10,726 ERROR > >> > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - > >> Caught unexpected exception. > >> > >> java.io.InterruptedIOException: Interrupted while waiting for data to > be > >> acknowledged by pipeline > >> > >> at > >> > org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) > >> > >> at > >> > org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) > >> > >> at > org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) > >> > >> at > >> > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) > >> > >> at > >> > org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) > >> > >> at > >> > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) > >> > >> at > >> > org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) > >> > >> at > >> > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) > >> > >> at > >> > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > >> > >> at > >> > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) > >> > >> at > >> > java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) > >> > >> at > >> > java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > >> > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > >> > >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > >> > >> at java.lang.Thread.run(Thread.java:748) > >> > >> 2020-04-01 14:48:10,726 WARN > >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure - > >> Exception while restoring keyed state backend for > >> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from > >> alternative (1/1), will retry while more alternatives are available. > >> > >> org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected > >> exception. > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > >> > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > >> > >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > >> > >> at java.lang.Thread.run(Thread.java:748) > >> > >> Caused by: java.io.InterruptedIOException: Interrupted while waiting > for > >> data to be acknowledged by pipeline > >> > >> at > >> > org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) > >> > >> at > >> > org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) > >> > >> at > org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) > >> > >> at > >> > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) > >> > >> at > >> > org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) > >> > >> at > >> > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) > >> > >> at > >> > org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) > >> > >> at > >> > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) > >> > >> at > >> > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > >> > >> at > >> > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) > >> > >> at > >> > java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) > >> > >> at > >> > java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) > >> > >> ... 15 more > |
Hi,Congxian:
不好意思,邮件消失在了邮件海中... 我是这么复现的,直接重启运行这个任务的TM。然后就会出现KeyedProcessFunction恢复失败。只有RocksDB StateBackend会出现这种错误,使用HDFS作为FsBackend可以正常恢复任务。一开始我以为是KeyedProcessFunction里面的自定义State恢复失败,最后写了一个空的KeyedProcessFunction也不能成功恢复任务。下面附上一个简单的Demo。 public class App { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(2 * 60 * 1000); DataStreamSource<Student> source = env.addSource(new SourceFunction<Student>() { private volatile boolean running = true; @Override public void run(SourceContext<Student> ctx) throws Exception { Random rand = new Random(); for (int i = 0; i < 100; i++) { int id = rand.nextInt(); ctx.collect(new Student(id, "Tom" + id)); } synchronized (this) { while (running) { this.wait(); } } } @Override public void cancel() { synchronized (this) { running = false; this.notifyAll(); } } }); source.keyBy("id").process(new KeyedProcessFunction<Tuple, Student, Student>() { @Override public void processElement(Student value, Context ctx, Collector<Student> out) throws Exception { out.collect(value); } }).addSink(new SinkFunction<Student>() { @Override public void invoke(Student value, Context context) throws Exception { System.out.println(value); } }); env.execute("test keyed process operator state restore...."); } @Getter @Setter @NoArgsConstructor @AllArgsConstructor public static class Student implements Serializable { private static final long serialVersionUID = 3909702675393996601L; private Integer id; private String name; } } 下面附上开启了DEBUG的log: 2020-04-14 11:42:44,679 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Establish JobManager connection for job 6fd13de6e9c84a51425f7cc34ce94940. 2020-04-14 11:42:44,684 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved slots to the leader of job 6fd13de6e9c84a51425f7cc34ce94940. 2020-04-14 11:42:44,727 DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Registered new allocation id ed04b5323aa885406201e85c9f8b7c78 for local state stores for job 6fd13de6e9c84a51425f7cc34ce94940. 2020-04-14 11:42:44,729 DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Registered new local state store with configuration LocalRecoveryConfig{localRecoveryMode=false, localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78], jobID=6fd13de6e9c84a51425f7cc34ce94940, jobVertexID=bc764cd8ddf7a0cff126f51c16239658, subtaskIndex=0}} for 6fd13de6e9c84a51425f7cc34ce94940 - bc764cd8ddf7a0cff126f51c16239658 - 0 under allocation id ed04b5323aa885406201e85c9f8b7c78. 2020-04-14 11:42:44,742 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionFactory - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Initialized org.apache.flink.runtime.io.network.partition.ResultPartitionFactory@41801faf 2020-04-14 11:42:44,747 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: Custom Source (1/1). 2020-04-14 11:42:44,748 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) switched from CREATED to DEPLOYING. 2020-04-14 11:42:44,748 INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for task Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) [DEPLOYING] 2020-04-14 11:42:44,751 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) [DEPLOYING]. 2020-04-14 11:42:44,752 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot ed04b5323aa885406201e85c9f8b7c78. 2020-04-14 11:42:44,772 DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Registered new local state store with configuration LocalRecoveryConfig{localRecoveryMode=false, localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78], jobID=6fd13de6e9c84a51425f7cc34ce94940, jobVertexID=20ba6b65f97481d5570070de90e4e791, subtaskIndex=0}} for 6fd13de6e9c84a51425f7cc34ce94940 - 20ba6b65f97481d5570070de90e4e791 - 0 under allocation id ed04b5323aa885406201e85c9f8b7c78. 2020-04-14 11:42:44,786 DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d): Created 1 input channels (local: 1, remote: 0, unknown: 0). 2020-04-14 11:42:44,788 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task KeyedProcess -> Sink: Unnamed (1/1). 2020-04-14 11:42:44,795 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) switched from CREATED to DEPLOYING. 2020-04-14 11:42:44,805 INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for task KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING] 2020-04-14 11:42:44,812 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING]. 2020-04-14 11:42:44,817 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore - Copying from hdfs://nameservice1/data/flink1_10/ha/flink1_10_0/blob/job_6fd13de6e9c84a51425f7cc34ce94940/blob_p-6581a081d862993cf5a06573dbb6621fef1e46b2-f795a9ecd636e88bdf7ddd7746b9ca06 to /data/flink1_10/tmp/blobStore-e924cf2e-5e6c-48c2-893e-c2e9c0a809b6/incoming/temp-00000000. 2020-04-14 11:42:45,060 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from 4ff74d2e5ff4f66a88688fdeafd2d3ec. 2020-04-14 11:42:45,919 DEBUG org.apache.flink.runtime.taskmanager.Task - Getting user code class loader for task ee17273414060c57d2d331a83d1a84fc at library cache manager took 1167 milliseconds 2020-04-14 11:42:45,920 DEBUG org.apache.flink.runtime.taskmanager.Task - Getting user code class loader for task 406f2d0b26fb4b1040ae5ac00028202d at library cache manager took 1108 milliseconds 2020-04-14 11:42:45,931 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) [DEPLOYING]. 2020-04-14 11:42:45,931 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING]. 2020-04-14 11:42:45,934 DEBUG org.apache.flink.runtime.io.network.buffer.LocalBufferPool - Using a local buffer pool with 2-10 buffers 2020-04-14 11:42:45,934 DEBUG org.apache.flink.runtime.io.network.buffer.LocalBufferPool - Using a local buffer pool with 0-8 buffers 2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Registered ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions]. 2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel - LocalInputChannel [0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc]: Requesting LOCAL subpartition 0 of partition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc. 2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.TaskEventDispatcher - registering 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc 2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Requesting subpartition 0 of ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions]. 2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Creating read view for subpartition 0 of partition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc. 2020-04-14 11:42:45,937 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Created PipelinedSubpartitionView(index: 0) of ResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc 2020-04-14 11:42:45,961 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) switched from DEPLOYING to RUNNING. 2020-04-14 11:42:45,963 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Initializing KeyedProcess -> Sink: Unnamed (1/1). 2020-04-14 11:42:45,967 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory 2020-04-14 11:42:45,984 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Using partitioner HASH for output 0 of task Source: Custom Source 2020-04-14 11:42:45,992 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: DEFAULT. 2020-04-14 11:42:45,992 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}. 2020-04-14 11:42:45,994 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) switched from DEPLOYING to RUNNING. 2020-04-14 11:42:45,994 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Initializing Source: Custom Source (1/1). 2020-04-14 11:42:45,994 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory 2020-04-14 11:42:45,995 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: DEFAULT. 2020-04-14 11:42:45,995 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}. 2020-04-14 11:42:46,033 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking Source: Custom Source (1/1) 2020-04-14 11:42:46,042 DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamSource_bc764cd8ddf7a0cff126f51c16239658_(1/1) with empty state. 2020-04-14 11:42:46,057 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking KeyedProcess -> Sink: Unnamed (1/1) 2020-04-14 11:42:46,060 DEBUG org.apache.flink.runtime.state.TaskStateManagerImpl - Operator c09dc291fad93d575e015871097bfc60 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=StateObjectCollection{[]}, stateSize=0} from job manager and local state alternatives [] from local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09. 2020-04-14 11:42:46,060 DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(1/1) with empty state. 2020-04-14 11:42:46,069 DEBUG org.apache.flink.runtime.state.TaskStateManagerImpl - Operator 20ba6b65f97481d5570070de90e4e791 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[IncrementalRemoteKeyedStateHandle{backendIdentifier=04ac09d6-1f1f-4a6c-a78d-74090c83b3c7, keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, checkpointId=1, sharedState={}, privateState={MANIFEST-000006=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/2ff261b8-f51c-42bf-9fab-93c6b119dcff', dataBytes=206}, OPTIONS-000010=File State: hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/426c66a6-d32e-43c8-9873-550237ee0963 [10379 bytes], CURRENT=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/bbbce7c9-ea02-4590-9b18-d7a322deb2f4', dataBytes=16}}, metaStateHandle=File State: hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/chk-1/9215630d-632e-48f6-b668-7dc235a8ff7a [1163 bytes], registered=false}]}, keyedStateFromStream=StateObjectCollection{[]}, stateSize=11764} from job manager and local state alternatives [] from local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09. 2020-04-14 11:42:46,070 DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating keyed state backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) and restoring with state from alternative (1/1). 2020-04-14 11:42:46,071 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Attempting to load RocksDB native library and store it under '/data/flink1_10/tmp' 2020-04-14 11:42:46,071 DEBUG org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Attempting to create RocksDB native library folder /data/flink1_10/tmp/rocksdb-lib-a5f35d4dd06539876a20dbabc82a7f33 2020-04-14 11:42:46,078 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true 2020-04-14 11:42:46,079 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true 2020-04-14 11:42:46,080 DEBUG org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@28a9bbee 2020-04-14 11:42:46,150 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Successfully loaded RocksDB native library 2020-04-14 11:42:46,154 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Getting managed memory shared cache for RocksDB. 2020-04-14 11:42:46,161 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Obtained shared RocksDB cache of size 53687092 bytes 2020-04-14 11:42:46,495 DEBUG org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation - Restoring keyed backend uid in operator KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from incremental snapshot to 04ac09d6-1f1f-4a6c-a78d-74090c83b3c7. 2020-04-14 11:42:46,571 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception. java.nio.file.NoSuchFileException: /data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526) at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253) at java.nio.file.Files.copy(Files.java:1274) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) 2020-04-14 11:42:46,576 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from alternative (1/1), will retry while more alternatives are available. org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.nio.file.NoSuchFileException: /data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526) at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253) at java.nio.file.Files.copy(Files.java:1274) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) ... 15 more 2020-04-14 11:42:46,579 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) switched from RUNNING to FAILED. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.nio.file.NoSuchFileException: /data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526) at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253) at java.nio.file.Files.copy(Files.java:1274) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) ... 15 more 2020-04-14 11:42:46,588 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d). 2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.taskmanager.Task - Release task KeyedProcess -> Sink: Unnamed (1/1) network resources (state: FAILED). 2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d): Releasing org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate@23ae29cd. 2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions]: Received consumed notification for subpartition 0. 2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Received consume notification from ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions]. 2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Releasing ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions]. 2020-04-14 11:42:46,590 DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Released PipelinedSubpartition#0 [number of buffers: 1 (0 bytes), number of buffers in backlog: 1, finished? false, read view? false]. 2020-04-14 11:42:46,590 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Released partition 0d224b8294583b8fcdf469150870d2a4 produced by ee17273414060c57d2d331a83d1a84fc. 2020-04-14 11:42:46,590 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [FAILED] 2020-04-14 11:42:46,603 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task KeyedProcess -> Sink: Unnamed (1/1) 406f2d0b26fb4b1040ae5ac00028202d. 在 2020-04-03 18:09:19,"Congxian Qiu" <[hidden email]> 写道: >HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint Meta 的相关信息,这个比较麻烦,可以参考 >CheckpointMetadataLoadingTest 的相关测试。 >我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的 >outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开 >debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好 > >Best, >Congxian > > >chenxyz <[hidden email]> 于2020年4月1日周三 下午5:18写道: > >> Hi, 从贤, >> 我查看了下HDFS, >> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。 >> >> >> >> >> >> >> >> >> 在 2020-04-01 16:50:13,"Congxian Qiu" <[hidden email]> 写道: >> >Hi >> >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复 >> >从 TM 日志看像下载出错了,你可以看下 >> >> >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst >> >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因 >> > >> >Best, >> >Congxian >> > >> > >> >chenxyz <[hidden email]> 于2020年4月1日周三 下午3:02写道: >> > >> >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for >> >> KeyedProcessOperator。这个问题怎么解决呢? >> >> >> >> 版本:1.10 standalone >> >> >> >> 配置信息: >> >> >> >> state.backend: rocksdb >> >> >> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint >> >> >> >> state.backend.incremental: true >> >> >> >> jobmanager.execution.failover-strategy: region >> >> >> >> io.tmp.dirs: /data/flink1_10/tmp >> >> >> >> >> >> >> >> >> >> 任务的checkpoint配置: >> >> >> >> env.enableCheckpointing(2 * 60 * 1000); >> >> >> >> >> >> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> >> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000); >> >> >> >> env.getCheckpointConfig().setCheckpointTimeout(60000); >> >> >> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); >> >> >> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12); >> >> >> >> >> >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> >> >> >> >> >> >> >> >> >> 日志信息: >> >> >> >> >> >> >> >> >> >> 2020-04-01 11:13:03 >> >> >> >> java.lang.Exception: Exception while creating >> StreamOperatorStateContext. >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> >> >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> >> >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> >> >> >> at java.lang.Thread.run(Thread.java:748) >> >> >> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed >> >> state backend for >> >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of >> the >> >> 1 provided restore options. >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) >> >> >> >> ... 9 more >> >> >> >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >> Caught >> >> unexpected exception. >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) >> >> >> >> ... 11 more >> >> >> >> Caused by: java.nio.file.NoSuchFileException: >> >> >> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst >> >> -> >> >> >> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst >> >> >> >> at >> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) >> >> >> >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) >> >> >> >> at >> >> >> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476) >> >> >> >> at java.nio.file.Files.createLink(Files.java:1086) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) >> >> >> >> ... 15 more >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> TaskManager的报错信息: >> >> >> >> >> >> >> >> >> >> 2020-04-01 14:48:10,726 ERROR >> >> >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - >> >> Caught unexpected exception. >> >> >> >> java.io.InterruptedIOException: Interrupted while waiting for data to >> be >> >> acknowledged by pipeline >> >> >> >> at >> >> >> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) >> >> >> >> at >> >> >> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) >> >> >> >> at >> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) >> >> >> >> at >> >> >> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) >> >> >> >> at >> >> >> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) >> >> >> >> at >> >> >> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) >> >> >> >> at >> >> >> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) >> >> >> >> at >> >> >> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) >> >> >> >> at >> >> >> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) >> >> >> >> at >> >> >> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >> >> >> >> at >> >> >> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) >> >> >> >> at >> >> >> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> >> >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> >> >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> >> >> >> at java.lang.Thread.run(Thread.java:748) >> >> >> >> 2020-04-01 14:48:10,726 WARN >> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure - >> >> Exception while restoring keyed state backend for >> >> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from >> >> alternative (1/1), will retry while more alternatives are available. >> >> >> >> org.apache.flink.runtime.state.BackendBuildingException: Caught >> unexpected >> >> exception. >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) >> >> >> >> at >> >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) >> >> >> >> at >> >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> >> >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> >> >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> >> >> >> at java.lang.Thread.run(Thread.java:748) >> >> >> >> Caused by: java.io.InterruptedIOException: Interrupted while waiting >> for >> >> data to be acknowledged by pipeline >> >> >> >> at >> >> >> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147) >> >> >> >> at >> >> >> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128) >> >> >> >> at >> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229) >> >> >> >> at >> >> >> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) >> >> >> >> at >> >> >> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) >> >> >> >> at >> >> >> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) >> >> >> >> at >> >> >> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) >> >> >> >> at >> >> >> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) >> >> >> >> at >> >> >> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) >> >> >> >> at >> >> >> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >> >> >> >> at >> >> >> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) >> >> >> >> at >> >> >> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) >> >> >> >> at >> >> >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) >> >> >> >> ... 15 more >> |
Free forum by Nabble | Edit this page |