配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//作业失败后不重启 env.setRestartStrategy(RestartStrategies.noRestart()); env.getCheckpointConfig().setCheckpointTimeout(500); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setStateBackend(new RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); 使用状态的代码private transient ListState<String> counts; @Override public void open(Configuration parameters) throws Exception { StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ListStateDescriptor<String> lastUserLogin = new ListStateDescriptor<>("lastUserLogin", String.class); lastUserLogin.enableTimeToLive(ttlConfig); counts = getRuntimeContext().getListState(lastUserLogin); } 我重启了task managers 后。发现 counts 里面的数据都丢失了 |
Hi
1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象 2 能否把你关于 counts 的其他代码也贴一下 3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看 4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html Best, Congxian sun <[hidden email]> 于2020年7月16日周四 下午6:16写道: > > 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > //作业失败后不重启 > env.setRestartStrategy(RestartStrategies.noRestart()); > env.getCheckpointConfig().setCheckpointTimeout(500); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > env.setStateBackend(new > RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); > 使用状态的代码private transient ListState<String> counts; > > > @Override > public void open(Configuration parameters) throws Exception { > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > ListStateDescriptor<String> lastUserLogin = new > ListStateDescriptor<>("lastUserLogin", String.class); > lastUserLogin.enableTimeToLive(ttlConfig); > counts = getRuntimeContext().getListState(lastUserLogin); > } > 我重启了task managers 后。发现 counts 里面的数据都丢失了 |
你好:counts 的数据 我是在下面打印出来了 List<String> list = Lists.newArrayList(counts.get()) ;
for(String ss : list){ System.out.println("!!!" + ss); log.info("!!!" + ss); },但是我重启服务之后,之前存的那些内容打印不出来了。 @Slf4j public class FlatMapTestState extends RichFlatMapFunction<String, Test222> { private transient ListState<String> counts; @Override public void open(Configuration parameters) throws Exception { StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ListStateDescriptor<String> lastUserLogin = new ListStateDescriptor<>("lastUserLogin", String.class); lastUserLogin.enableTimeToLive(ttlConfig); counts = getRuntimeContext().getListState(lastUserLogin); } @Override public void flatMap(String s, Collector<Test222> collector) throws Exception { Test222 message = JSONUtil.toObject(s, new TypeReference<Test222>() { }); System.out.println(DateUtil.toLongDateString(new Date())); log.info(DateUtil.toLongDateString(new Date())); counts.add(message.getId()); List<String> list = Lists.newArrayList(counts.get()) ; for(String ss : list){ System.out.println("!!!" + ss); log.info("!!!" + ss); } log.info(DateUtil.toLongDateString(new Date())); System.out.println(DateUtil.toLongDateString(new Date())); } } ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月16日(星期四) 晚上8:16 收件人: "user-zh"<[hidden email]>; 主题: Re: state无法从checkpoint中恢复 Hi 1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象 2 能否把你关于 counts 的其他代码也贴一下 3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看 4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html Best, Congxian sun <[hidden email]> 于2020年7月16日周四 下午6:16写道: > > 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > //作业失败后不重启 > env.setRestartStrategy(RestartStrategies.noRestart()); > env.getCheckpointConfig().setCheckpointTimeout(500); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > env.setStateBackend(new > RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); > 使用状态的代码private transient ListState<String&gt; counts; > > > @Override > public void open(Configuration parameters) throws Exception { > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > ListStateDescriptor<String&gt; lastUserLogin = new > ListStateDescriptor<&gt;("lastUserLogin", String.class); > lastUserLogin.enableTimeToLive(ttlConfig); > counts = getRuntimeContext().getListState(lastUserLogin); > } > 我重启了task managers 后。发现 counts 里面的数据都丢失了 |
Hi
1 你需要回复一下我之前问你的问题:你可以从 JM log 看一下是否从 checkpoint 恢复了 2. 这里没有打印只是表明当前处理的 key 没有 state 数据,并不能表示 state 没有恢复回来,state 值是绑定到某个 key 上的(keyby 的 key) Best, Congxian sun <[hidden email]> 于2020年7月17日周五 下午5:22写道: > 你好:counts 的数据 我是在下面打印出来了 List<String> list = > Lists.newArrayList(counts.get()) ; > for(String ss : list){ > System.out.println("!!!" + ss); > log.info("!!!" + ss); > },但是我重启服务之后,之前存的那些内容打印不出来了。 > @Slf4j > public class FlatMapTestState extends RichFlatMapFunction<String, > Test222> { > > > private transient ListState<String> counts; > > > @Override > public void open(Configuration parameters) throws Exception { > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > ListStateDescriptor<String> lastUserLogin = new > ListStateDescriptor<>("lastUserLogin", String.class); > lastUserLogin.enableTimeToLive(ttlConfig); > counts = getRuntimeContext().getListState(lastUserLogin); > } > > > @Override > public void flatMap(String s, Collector<Test222> collector) throws > Exception { > Test222 message = JSONUtil.toObject(s, new > TypeReference<Test222>() { > }); > > System.out.println(DateUtil.toLongDateString(new Date())); > log.info(DateUtil.toLongDateString(new Date())); > counts.add(message.getId()); > List<String> list = Lists.newArrayList(counts.get()) ; > for(String ss : list){ > System.out.println("!!!" + ss); > log.info("!!!" + ss); > } > log.info(DateUtil.toLongDateString(new Date())); > System.out.println(DateUtil.toLongDateString(new Date())); > } > } > > > > > > > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [hidden email]>; > 发送时间: 2020年7月16日(星期四) 晚上8:16 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: state无法从checkpoint中恢复 > > > > Hi > > 1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象 > 2 能否把你关于 counts 的其他代码也贴一下 > 3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看 > 4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html > Best, > Congxian > > > sun <[hidden email]> 于2020年7月16日周四 下午6:16写道: > > > > > > 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > //作业失败后不重启 > > env.setRestartStrategy(RestartStrategies.noRestart()); > > env.getCheckpointConfig().setCheckpointTimeout(500); > > > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > env.setStateBackend(new > > RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); > > 使用状态的代码private transient ListState<String&gt; counts; > > > > > > @Override > > public void open(Configuration parameters) throws Exception { > > StateTtlConfig ttlConfig = StateTtlConfig > > > .newBuilder(Time.minutes(30)) > > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > > .build(); > > > > ListStateDescriptor<String&gt; > lastUserLogin = new > > ListStateDescriptor<&gt;("lastUserLogin", String.class); > > lastUserLogin.enableTimeToLive(ttlConfig); > > counts = > getRuntimeContext().getListState(lastUserLogin); > > } > > 我重启了task managers 后。发现 counts 里面的数据都丢失了 |
JM日志有点不熟悉,不知道是否从 checkpoint 恢复了
18:08:07.615 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 116 @ 1595239687615 for job acd456ff6f2f9f59ee89b126503c20f0. 18:08:07.628 [flink-akka.actor.default-dispatcher-420] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 116 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 13 ms). 18:08:08.615 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 117 @ 1595239688615 for job acd456ff6f2f9f59ee89b126503c20f0. 18:08:08.626 [flink-akka.actor.default-dispatcher-420] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 117 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 11 ms). 18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job ty-bi-flink (acd456ff6f2f9f59ee89b126503c20f0) switched from state RUNNING to CANCELLING. 18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from RUNNING to CANCELING. 18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from RUNNING to CANCELING. 18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from RUNNING to CANCELING. 18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from RUNNING to CANCELING. 18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (1/4) (87d4c7af7d5fb5f81bae48aae77de473) switched from RUNNING to CANCELING. 18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (2/4) (7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from RUNNING to CANCELING. 18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (3/4) (9035e059e465b8c520edf37ec734b43e) switched from RUNNING to CANCELING. 18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (4/4) (e6ff47b0da505b2aa4d775d7821b8356) switched from RUNNING to CANCELING. 18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (4/4) (e6ff47b0da505b2aa4d775d7821b8356) switched from CANCELING to CANCELED. 18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (3/4) (9035e059e465b8c520edf37ec734b43e) switched from CANCELING to CANCELED. 18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (2/4) (7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from CANCELING to CANCELED. 18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (1/4) (87d4c7af7d5fb5f81bae48aae77de473) switched from CANCELING to CANCELED. 18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from CANCELING to CANCELED. 18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from CANCELING to CANCELED. 18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from CANCELING to CANCELED. 18:08:09.381 [flink-akka.actor.default-dispatcher-416] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from CANCELING to CANCELED. 18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job ty-bi-flink (acd456ff6f2f9f59ee89b126503c20f0) switched from state CANCELLING to CANCELED. 18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job acd456ff6f2f9f59ee89b126503c20f0. 18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO o.a.f.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down 18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint - Checkpoint with ID 117 at 'file:/opt/flink/flink-1.7.2/checkpoints/acd456ff6f2f9f59ee89b126503c20f0/chk-117' not discarded. 18:08:09.382 [flink-akka.actor.default-dispatcher-427] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job acd456ff6f2f9f59ee89b126503c20f0 reached globally terminal state CANCELED. 18:08:09.384 [flink-akka.actor.default-dispatcher-416] INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job ty-bi-flink(acd456ff6f2f9f59ee89b126503c20f0). 18:08:09.385 [flink-akka.actor.default-dispatcher-416] INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 7f7791cdc957a13cfaf639062c495fb9: JobManager is shutting down.. 18:08:09.385 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending SlotPool. 18:08:09.385 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping SlotPool. 18:08:09.385 [flink-akka.actor.default-dispatcher-416] INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Disconnect job manager [hidden email]://flink@rcx51101:6123/user/jobmanager_4 for job acd456ff6f2f9f59ee89b126503c20f0 from the resource manager. 18:08:09.385 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManagerRunner already shutdown. 18:08:33.384 [flink-rest-server-netty-worker-thread-4] WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead. 18:08:34.205 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 6dbecb3e4f536c2c92ca7931cba54fd2 (ty-bi-flink). 18:08:34.205 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_5 . 18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Initializing job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2). 18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Using restart strategy NoRestartStrategy for ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2). 18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/c8a89ca4-afcc-41c0-b121-bbfe4354e502 . 18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: full graph restart 18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2). 18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 0 ms. 18:08:34.207 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/flink-1.7.2/checkpoints', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=null, enableIncrementalCheckpointing=UNDEFINED} 18:08:34.207 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Configuring application-defined state backend with job/cluster config 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManager runner for job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@rcx51101:6123/user/jobmanager_5. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2) 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2) switched from state CREATED to RUNNING. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/4) (c06f0e753f644bdbcfe50cc8d2364cf6) switched from CREATED to SCHEDULED. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/4) (5436dd5759d18472fcf171f5df9d9bc9) switched from CREATED to SCHEDULED. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (3/4) (2a8cf04be945d59a70a3d82f50b38cd6) switched from CREATED to SCHEDULED. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (4/4) (9797fe0ec397922dff0c8bde4fb89ba2) switched from CREATED to SCHEDULED. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (1/4) (4127cdcd8ad7bd2011b7f8a8330663b9) switched from CREATED to SCHEDULED. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (2/4) (c0e50cbfbab0b29973cc517056f3f561) switched from CREATED to SCHEDULED. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (3/4) (989517f5535736062e6ce870e30742ee) switched from CREATED to SCHEDULED. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (4/4) (eaf47a632d3e735f1341e1d6d4ec7b7f) switched from CREATED to SCHEDULED. 18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Connecting to ResourceManager akka.tcp://flink@rcx51101:6123/user/resourcemanager(00000000000000000000000000000000) 18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{092c50cc73f659cbca805205e07b239c}] 18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{16e0d6c68cbbf62c056758903c129661}] 18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{f727b58cc9c5abe1627216c5973f98b5}] 18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{bb8a60407b3fa9329ccc1ae8454bf239}] 18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved ResourceManager address, beginning registration 18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.jobmaster.JobMaster - Registration at ResourceManager attempt 1 (timeout=100ms) 18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Registering job manager [hidden email]://flink@rcx51101:6123/user/jobmanager_5 for job 6dbecb3e4f536c2c92ca7931cba54fd2. 18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Registered job manager [hidden email]://flink@rcx51101:6123/user/jobmanager_5 for job 6dbecb3e4f536c2c92ca7931cba54fd2. 18:08:34.209 [flink-akka.actor.default-dispatcher-420] INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000. 18:08:34.209 [flink-akka.actor.default-dispatcher-420] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Requesting new slot [SlotRequestId{bb8a60407b3fa9329ccc1ae8454bf239}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. 18:08:34.210 [flink-akka.actor.default-dispatcher-420] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Requesting new slot [SlotRequestId{f727b58cc9c5abe1627216c5973f98b5}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. 18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id AllocationID{db118025945481bba66b8ffa734e4202}. 18:08:34.210 [flink-akka.actor.default-dispatcher-420] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Requesting new slot [SlotRequestId{16e0d6c68cbbf62c056758903c129661}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. 18:08:34.210 [flink-akka.actor.default-dispatcher-420] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Requesting new slot [SlotRequestId{092c50cc73f659cbca805205e07b239c}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. 18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id AllocationID{d5147bcc731a51f09bdb32e366d93b02}. 18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id AllocationID{14001529dd0d04ebbd169241cb59f918}. 18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id AllocationID{bb02373b91c626c6fde666512d5b62ed}. 18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/4) (c06f0e753f644bdbcfe50cc8d2364cf6) switched from SCHEDULED to DEPLOYING. 18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (1/4) (attempt #0) to rcx51102 18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (3/4) (2a8cf04be945d59a70a3d82f50b38cd6) switched from SCHEDULED to DEPLOYING. 18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (3/4) (attempt #0) to rcx51102 18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/4) (5436dd5759d18472fcf171f5df9d9bc9) switched from SCHEDULED to DEPLOYING. 18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (2/4) (attempt #0) to rcx51102 18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (4/4) (9797fe0ec397922dff0c8bde4fb89ba2) switched from SCHEDULED to DEPLOYING. 18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (4/4) (attempt #0) to rcx51102 18:08:34.220 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (4/4) (eaf47a632d3e735f1341e1d6d4ec7b7f) switched from SCHEDULED to DEPLOYING. 18:08:34.220 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying map_sub_order_detail -> Sink: Print to Std. Out (4/4) (attempt #0) to rcx51102 18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (3/4) (989517f5535736062e6ce870e30742ee) switched from SCHEDULED to DEPLOYING. 18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying map_sub_order_detail -> Sink: Print to Std. Out (3/4) (attempt #0) to rcx51102 18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (2/4) (c0e50cbfbab0b29973cc517056f3f561) switched from SCHEDULED to DEPLOYING. 18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying map_sub_order_detail -> Sink: Print to Std. Out (2/4) (attempt #0) to rcx51102 18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (1/4) (4127cdcd8ad7bd2011b7f8a8330663b9) switched from SCHEDULED to DEPLOYING. 18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying map_sub_order_detail -> Sink: Print to Std. Out (1/4) (attempt #0) to rcx51102 18:08:34.506 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom Source (1/4) of job 6dbecb3e4f536c2c92ca7931cba54fd2 is not in state RUNNING but DEPLOYING instead. Aborting checkpoint. 18:08:35.036 [flink-akka.actor.default-dispatcher-430] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (3/4) (989517f5535736062e6ce870e30742ee) switched from DEPLOYING to RUNNING. 18:08:35.037 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (2/4) (c0e50cbfbab0b29973cc517056f3f561) switched from DEPLOYING to RUNNING. 18:08:35.057 [flink-akka.actor.default-dispatcher-430] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (4/4) (eaf47a632d3e735f1341e1d6d4ec7b7f) switched from DEPLOYING to RUNNING. 18:08:35.058 [flink-akka.actor.default-dispatcher-430] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - map_sub_order_detail -> Sink: Print to Std. Out (1/4) (4127cdcd8ad7bd2011b7f8a8330663b9) switched from DEPLOYING to RUNNING. 18:08:35.069 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (4/4) (9797fe0ec397922dff0c8bde4fb89ba2) switched from DEPLOYING to RUNNING. 18:08:35.070 [flink-akka.actor.default-dispatcher-430] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/4) (c06f0e753f644bdbcfe50cc8d2364cf6) switched from DEPLOYING to RUNNING. 18:08:35.076 [flink-akka.actor.default-dispatcher-418] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (3/4) (2a8cf04be945d59a70a3d82f50b38cd6) switched from DEPLOYING to RUNNING. 18:08:35.076 [flink-akka.actor.default-dispatcher-430] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/4) (5436dd5759d18472fcf171f5df9d9bc9) switched from DEPLOYING to RUNNING. 18:08:35.506 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1595239715506 for job 6dbecb3e4f536c2c92ca7931cba54fd2. 18:08:35.530 [flink-akka.actor.default-dispatcher-430] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job 6dbecb3e4f536c2c92ca7931cba54fd2 (74134 bytes in 24 ms). ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月17日(星期五) 晚上10:58 收件人: "user-zh"<[hidden email]>; 主题: Re: state无法从checkpoint中恢复 Hi 1 你需要回复一下我之前问你的问题:你可以从 JM log 看一下是否从 checkpoint 恢复了 2. 这里没有打印只是表明当前处理的 key 没有 state 数据,并不能表示 state 没有恢复回来,state 值是绑定到某个 key 上的(keyby 的 key) Best, Congxian sun <[hidden email]> 于2020年7月17日周五 下午5:22写道: > 你好:counts 的数据 我是在下面打印出来了 List<String&gt; list = > Lists.newArrayList(counts.get()) ; > for(String ss : list){ > System.out.println("!!!" + ss); > log.info("!!!" + ss); > },但是我重启服务之后,之前存的那些内容打印不出来了。 > @Slf4j > public class FlatMapTestState extends RichFlatMapFunction<String, > Test222&gt; { > > > private transient ListState<String&gt; counts; > > > @Override > public void open(Configuration parameters) throws Exception { > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > ListStateDescriptor<String&gt; lastUserLogin = new > ListStateDescriptor<&gt;("lastUserLogin", String.class); > lastUserLogin.enableTimeToLive(ttlConfig); > counts = getRuntimeContext().getListState(lastUserLogin); > } > > > @Override > public void flatMap(String s, Collector<Test222&gt; collector) throws > Exception { > Test222 message = JSONUtil.toObject(s, new > TypeReference<Test222&gt;() { > }); > > System.out.println(DateUtil.toLongDateString(new Date())); > log.info(DateUtil.toLongDateString(new Date())); > counts.add(message.getId()); > List<String&gt; list = Lists.newArrayList(counts.get()) ; > for(String ss : list){ > System.out.println("!!!" + ss); > log.info("!!!" + ss); > } > log.info(DateUtil.toLongDateString(new Date())); > System.out.println(DateUtil.toLongDateString(new Date())); > } > } > > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人: > "user-zh" > < > [hidden email]&gt;; > 发送时间:&nbsp;2020年7月16日(星期四) 晚上8:16 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: state无法从checkpoint中恢复 > > > > Hi > > 1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象 > 2 能否把你关于 counts 的其他代码也贴一下 > 3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看 > 4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html > Best, > Congxian > > > sun <[hidden email]&gt; 于2020年7月16日周四 下午6:16写道: > > &gt; > &gt; > 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > &gt; //作业失败后不重启 > &gt; env.setRestartStrategy(RestartStrategies.noRestart()); > &gt; env.getCheckpointConfig().setCheckpointTimeout(500); > &gt; > &gt; > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > &gt; > &gt; > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > &gt; env.setStateBackend(new > &gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); > &gt;&nbsp;&nbsp; 使用状态的代码private transient ListState<String&amp;gt; counts; > &gt; > &gt; > &gt; @Override > &gt; public void open(Configuration parameters) throws Exception { > &gt;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = StateTtlConfig > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .newBuilder(Time.minutes(30)) > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > &gt; > &gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .build(); > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp; ListStateDescriptor<String&amp;gt; > lastUserLogin = new > &gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; lastUserLogin.enableTimeToLive(ttlConfig); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; counts = > getRuntimeContext().getListState(lastUserLogin); > &gt; } > &gt; 我重启了task managers 后。发现&nbsp; counts&nbsp; 里面的数据都丢失了 |
Free forum by Nabble | Edit this page |