我遇到的问题现象是这样的
1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 flink run -d -s hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod 2、flink-conf.xml state.checkpoints.dir: hdfs:///user/flink/checkpoints/default 3、代码checkpoint设置 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 10)); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.enableCheckpointing(1 * 60 * 1000); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.setTolerableCheckpointFailureNumber(100); checkpointConfig.setCheckpointTimeout(60 * 1000); checkpointConfig.setMaxConcurrentCheckpoints(1); 4、问题现象 a)运维同事切换yarn resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 restore,从日志中看还是从chk-100 restore的。 d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String> sourceMilApplysLogStream = MySQLSource.<String>builder() 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? 2021-05-24 16:49:50,398 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.savepoint.path, hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。 |
这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
1、从savepoint恢复; 2、作业开始定期做savepoint; 3、作业failover。 如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。 如果还是有问题,需要通过日志来排查了。 董建 <[hidden email]> 于2021年5月28日周五 下午5:37写道: > 我遇到的问题现象是这样的 > > > > > 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 > > > > > flink run -d -s > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx > /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod > > > > > 2、flink-conf.xml > > > > > state.checkpoints.dir: hdfs:///user/flink/checkpoints/default > > > > > 3、代码checkpoint设置 > > > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, > 10)); > > > > > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > > > > > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > env.enableCheckpointing(1 * 60 * 1000); > > > > > > checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > > > checkpointConfig.setTolerableCheckpointFailureNumber(100); > > > > > checkpointConfig.setCheckpointTimeout(60 * 1000); > > > > > checkpointConfig.setMaxConcurrentCheckpoints(1); > > > > > 4、问题现象 > > > > > a)运维同事切换yarn > resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 > > > > > > b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 > > > > > c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 > restore,从日志中看还是从chk-100 restore的。 > > > > > d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String> > sourceMilApplysLogStream = MySQLSource.<String>builder() > > > > > 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 > > > > > e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? > > > > > 2021-05-24 16:49:50,398 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: execution.savepoint.path, > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > > > > 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 > > > > > 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。 |
稳定复现
checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。 我们jobmanager没有做ha,不知道是否是这个原因导致的? 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。 >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: execution.savepoint.path, >> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 在 2021-05-28 18:15:38,"刘建刚" <[hidden email]> 写道: >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗? >1、从savepoint恢复; >2、作业开始定期做savepoint; >3、作业failover。 >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。 >如果还是有问题,需要通过日志来排查了。 > >董建 <[hidden email]> 于2021年5月28日周五 下午5:37写道: > >> 我遇到的问题现象是这样的 >> >> >> >> >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 >> >> >> >> >> flink run -d -s >> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx >> /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod >> >> >> >> >> 2、flink-conf.xml >> >> >> >> >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default >> >> >> >> >> 3、代码checkpoint设置 >> >> >> >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> >> >> >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, >> 10)); >> >> >> >> >> CheckpointConfig checkpointConfig = env.getCheckpointConfig(); >> >> >> >> >> >> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> >> >> >> >> env.enableCheckpointing(1 * 60 * 1000); >> >> >> >> >> >> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> >> >> >> checkpointConfig.setTolerableCheckpointFailureNumber(100); >> >> >> >> >> checkpointConfig.setCheckpointTimeout(60 * 1000); >> >> >> >> >> checkpointConfig.setMaxConcurrentCheckpoints(1); >> >> >> >> >> 4、问题现象 >> >> >> >> >> a)运维同事切换yarn >> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 >> >> >> >> >> >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 >> >> >> >> >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 >> restore,从日志中看还是从chk-100 restore的。 >> >> >> >> >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String> >> sourceMilApplysLogStream = MySQLSource.<String>builder() >> >> >> >> >> 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 >> >> >> >> >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? >> >> >> >> >> 2021-05-24 16:49:50,398 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: execution.savepoint.path, >> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 >> >> >> >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 >> >> >> >> >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。 |
那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。
董建 <[hidden email]> 于2021年5月28日周五 下午6:24写道: > 稳定复现 > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。 > 我们jobmanager没有做ha,不知道是否是这个原因导致的? > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。 > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。 > >> org.apache.flink.configuration.GlobalConfiguration [] - > Loading > >> configuration property: execution.savepoint.path, > >> > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > > > > > > > > > > > > > > > 在 2021-05-28 18:15:38,"刘建刚" <[hidden email]> 写道: > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗? > >1、从savepoint恢复; > >2、作业开始定期做savepoint; > >3、作业failover。 > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。 > >如果还是有问题,需要通过日志来排查了。 > > > >董建 <[hidden email]> 于2021年5月28日周五 下午5:37写道: > > > >> 我遇到的问题现象是这样的 > >> > >> > >> > >> > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 > >> > >> > >> > >> > >> flink run -d -s > >> > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx > >> /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod > >> > >> > >> > >> > >> 2、flink-conf.xml > >> > >> > >> > >> > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default > >> > >> > >> > >> > >> 3、代码checkpoint设置 > >> > >> > >> > >> > >> StreamExecutionEnvironment env = > >> StreamExecutionEnvironment.getExecutionEnvironment(); > >> > >> > >> > >> > >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, > >> 10)); > >> > >> > >> > >> > >> CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > >> > >> > >> > >> > >> > >> > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> > >> > >> > >> > >> env.enableCheckpointing(1 * 60 * 1000); > >> > >> > >> > >> > >> > >> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > >> > >> > >> > >> > >> checkpointConfig.setTolerableCheckpointFailureNumber(100); > >> > >> > >> > >> > >> checkpointConfig.setCheckpointTimeout(60 * 1000); > >> > >> > >> > >> > >> checkpointConfig.setMaxConcurrentCheckpoints(1); > >> > >> > >> > >> > >> 4、问题现象 > >> > >> > >> > >> > >> a)运维同事切换yarn > >> > resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 > >> > >> > >> > >> > >> > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 > >> > >> > >> > >> > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 > >> restore,从日志中看还是从chk-100 restore的。 > >> > >> > >> > >> > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String> > >> sourceMilApplysLogStream = MySQLSource.<String>builder() > >> > >> > >> > >> > >> 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 > >> > >> > >> > >> > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? > >> > >> > >> > >> > >> 2021-05-24 16:49:50,398 INFO > >> org.apache.flink.configuration.GlobalConfiguration [] - > Loading > >> configuration property: execution.savepoint.path, > >> > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > >> > >> > >> > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 > >> > >> > >> > >> > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。 > |
HA在ZK里面记录了最后一次成功的checkpoint counter和地址,没有启用HA的话,就是从指定的savepoint恢复的。
Best, Yang 刘建刚 <[hidden email]> 于2021年5月28日周五 下午6:51写道: > 那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。 > > 董建 <[hidden email]> 于2021年5月28日周五 下午6:24写道: > > > 稳定复现 > > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。 > > 我们jobmanager没有做ha,不知道是否是这个原因导致的? > > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。 > > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。 > > >> org.apache.flink.configuration.GlobalConfiguration [] - > > Loading > > >> configuration property: execution.savepoint.path, > > >> > > > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2021-05-28 18:15:38,"刘建刚" <[hidden email]> 写道: > > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗? > > >1、从savepoint恢复; > > >2、作业开始定期做savepoint; > > >3、作业failover。 > > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。 > > >如果还是有问题,需要通过日志来排查了。 > > > > > >董建 <[hidden email]> 于2021年5月28日周五 下午5:37写道: > > > > > >> 我遇到的问题现象是这样的 > > >> > > >> > > >> > > >> > > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 > > >> > > >> > > >> > > >> > > >> flink run -d -s > > >> > > > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx > > >> /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod > > >> > > >> > > >> > > >> > > >> 2、flink-conf.xml > > >> > > >> > > >> > > >> > > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default > > >> > > >> > > >> > > >> > > >> 3、代码checkpoint设置 > > >> > > >> > > >> > > >> > > >> StreamExecutionEnvironment env = > > >> StreamExecutionEnvironment.getExecutionEnvironment(); > > >> > > >> > > >> > > >> > > >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, > > >> 10)); > > >> > > >> > > >> > > >> > > >> CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > > >> > > >> > > >> > > >> > > >> > > >> > > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > >> > > >> > > >> > > >> > > >> env.enableCheckpointing(1 * 60 * 1000); > > >> > > >> > > >> > > >> > > >> > > >> > checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > >> > > >> > > >> > > >> > > >> checkpointConfig.setTolerableCheckpointFailureNumber(100); > > >> > > >> > > >> > > >> > > >> checkpointConfig.setCheckpointTimeout(60 * 1000); > > >> > > >> > > >> > > >> > > >> checkpointConfig.setMaxConcurrentCheckpoints(1); > > >> > > >> > > >> > > >> > > >> 4、问题现象 > > >> > > >> > > >> > > >> > > >> a)运维同事切换yarn > > >> > > > resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 > > >> > > >> > > >> > > >> > > >> > > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 > > >> > > >> > > >> > > >> > > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 > > >> restore,从日志中看还是从chk-100 restore的。 > > >> > > >> > > >> > > >> > > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String> > > >> sourceMilApplysLogStream = MySQLSource.<String>builder() > > >> > > >> > > >> > > >> > > >> 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 > > >> > > >> > > >> > > >> > > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? > > >> > > >> > > >> > > >> > > >> 2021-05-24 16:49:50,398 INFO > > >> org.apache.flink.configuration.GlobalConfiguration [] - > > Loading > > >> configuration property: execution.savepoint.path, > > >> > > > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > > >> > > >> > > >> > > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 > > >> > > >> > > >> > > >> > > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。 > > > |
Free forum by Nabble | Edit this page |