flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

董建
我遇到的问题现象是这样的




1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。




flink run -d -s hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod




2、flink-conf.xml




state.checkpoints.dir: hdfs:///user/flink/checkpoints/default




3、代码checkpoint设置




   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();




       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 10));




       CheckpointConfig checkpointConfig = env.getCheckpointConfig();




       checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);




       env.enableCheckpointing(1 * 60 * 1000);




       checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);




       checkpointConfig.setTolerableCheckpointFailureNumber(100);




       checkpointConfig.setCheckpointTimeout(60 * 1000);




       checkpointConfig.setMaxConcurrentCheckpoints(1);




4、问题现象




a)运维同事切换yarn resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器




      b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200




c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 restore,从日志中看还是从chk-100 restore的。




d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String> sourceMilApplysLogStream = MySQLSource.<String>builder()




  重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费




e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?




2021-05-24 16:49:50,398 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.savepoint.path, hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100



预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费




现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
Reply | Threaded
Open this post in threaded view
|

Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

liujiangang
这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
1、从savepoint恢复;
2、作业开始定期做savepoint;
3、作业failover。
如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
如果还是有问题,需要通过日志来排查了。

董建 <[hidden email]> 于2021年5月28日周五 下午5:37写道:

> 我遇到的问题现象是这样的
>
>
>
>
> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
>
>
>
>
> flink run -d -s
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx
> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
>
>
>
>
> 2、flink-conf.xml
>
>
>
>
> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
>
>
>
>
> 3、代码checkpoint设置
>
>
>
>
>    StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>
>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> 10));
>
>
>
>
>        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>
>
>
>
>
>  checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
>
>        env.enableCheckpointing(1 * 60 * 1000);
>
>
>
>
>
>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
>
>
>
>        checkpointConfig.setTolerableCheckpointFailureNumber(100);
>
>
>
>
>        checkpointConfig.setCheckpointTimeout(60 * 1000);
>
>
>
>
>        checkpointConfig.setMaxConcurrentCheckpoints(1);
>
>
>
>
> 4、问题现象
>
>
>
>
> a)运维同事切换yarn
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
>
>
>
>
>
> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
>
>
>
>
> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> restore,从日志中看还是从chk-100 restore的。
>
>
>
>
> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String>
> sourceMilApplysLogStream = MySQLSource.<String>builder()
>
>
>
>
>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
>
>
>
>
> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
>
>
>
>
> 2021-05-24 16:49:50,398 INFO
> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: execution.savepoint.path,
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>
>
>
> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
>
>
>
>
> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

董建
稳定复现
checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
我们jobmanager没有做ha,不知道是否是这个原因导致的?
日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: execution.savepoint.path,
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100














在 2021-05-28 18:15:38,"刘建刚" <[hidden email]> 写道:

>这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
>1、从savepoint恢复;
>2、作业开始定期做savepoint;
>3、作业failover。
>如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
>如果还是有问题,需要通过日志来排查了。
>
>董建 <[hidden email]> 于2021年5月28日周五 下午5:37写道:
>
>> 我遇到的问题现象是这样的
>>
>>
>>
>>
>> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
>>
>>
>>
>>
>> flink run -d -s
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>> -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx
>> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
>>
>>
>>
>>
>> 2、flink-conf.xml
>>
>>
>>
>>
>> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
>>
>>
>>
>>
>> 3、代码checkpoint设置
>>
>>
>>
>>
>>    StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>
>>
>>
>>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
>> 10));
>>
>>
>>
>>
>>        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>>
>>
>>
>>
>>
>>  checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>>
>>
>>
>>        env.enableCheckpointing(1 * 60 * 1000);
>>
>>
>>
>>
>>
>>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>
>>
>>
>>
>>        checkpointConfig.setTolerableCheckpointFailureNumber(100);
>>
>>
>>
>>
>>        checkpointConfig.setCheckpointTimeout(60 * 1000);
>>
>>
>>
>>
>>        checkpointConfig.setMaxConcurrentCheckpoints(1);
>>
>>
>>
>>
>> 4、问题现象
>>
>>
>>
>>
>> a)运维同事切换yarn
>> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
>>
>>
>>
>>
>>
>> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
>>
>>
>>
>>
>> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
>> restore,从日志中看还是从chk-100 restore的。
>>
>>
>>
>>
>> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String>
>> sourceMilApplysLogStream = MySQLSource.<String>builder()
>>
>>
>>
>>
>>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
>>
>>
>>
>>
>> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
>>
>>
>>
>>
>> 2021-05-24 16:49:50,398 INFO
>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: execution.savepoint.path,
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>>
>>
>>
>> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
>>
>>
>>
>>
>> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

liujiangang
那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。

董建 <[hidden email]> 于2021年5月28日周五 下午6:24写道:

> 稳定复现
> checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> >> org.apache.flink.configuration.GlobalConfiguration           [] -
> Loading
> >> configuration property: execution.savepoint.path,
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-05-28 18:15:38,"刘建刚" <[hidden email]> 写道:
> >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> >1、从savepoint恢复;
> >2、作业开始定期做savepoint;
> >3、作业failover。
> >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> >如果还是有问题,需要通过日志来排查了。
> >
> >董建 <[hidden email]> 于2021年5月28日周五 下午5:37写道:
> >
> >> 我遇到的问题现象是这样的
> >>
> >>
> >>
> >>
> >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> >>
> >>
> >>
> >>
> >> flink run -d -s
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx
> >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> >>
> >>
> >>
> >>
> >> 2、flink-conf.xml
> >>
> >>
> >>
> >>
> >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> >>
> >>
> >>
> >>
> >> 3、代码checkpoint设置
> >>
> >>
> >>
> >>
> >>    StreamExecutionEnvironment env =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >>
> >>
> >>
> >>
> >>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> >> 10));
> >>
> >>
> >>
> >>
> >>        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> >>
> >>
> >>
> >>
> >>
> >>
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >>
> >>
> >>
> >>        env.enableCheckpointing(1 * 60 * 1000);
> >>
> >>
> >>
> >>
> >>
> >>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >>
> >>
> >>
> >>
> >>        checkpointConfig.setTolerableCheckpointFailureNumber(100);
> >>
> >>
> >>
> >>
> >>        checkpointConfig.setCheckpointTimeout(60 * 1000);
> >>
> >>
> >>
> >>
> >>        checkpointConfig.setMaxConcurrentCheckpoints(1);
> >>
> >>
> >>
> >>
> >> 4、问题现象
> >>
> >>
> >>
> >>
> >> a)运维同事切换yarn
> >>
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> >>
> >>
> >>
> >>
> >>
> >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> >>
> >>
> >>
> >>
> >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> >> restore,从日志中看还是从chk-100 restore的。
> >>
> >>
> >>
> >>
> >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String>
> >> sourceMilApplysLogStream = MySQLSource.<String>builder()
> >>
> >>
> >>
> >>
> >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> >>
> >>
> >>
> >>
> >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> >>
> >>
> >>
> >>
> >> 2021-05-24 16:49:50,398 INFO
> >> org.apache.flink.configuration.GlobalConfiguration           [] -
> Loading
> >> configuration property: execution.savepoint.path,
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >>
> >>
> >>
> >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> >>
> >>
> >>
> >>
> >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

Yang Wang
HA在ZK里面记录了最后一次成功的checkpoint counter和地址,没有启用HA的话,就是从指定的savepoint恢复的。


Best,
Yang

刘建刚 <[hidden email]> 于2021年5月28日周五 下午6:51写道:

> 那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。
>
> 董建 <[hidden email]> 于2021年5月28日周五 下午6:24写道:
>
> > 稳定复现
> > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> > 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> > >> org.apache.flink.configuration.GlobalConfiguration           [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-05-28 18:15:38,"刘建刚" <[hidden email]> 写道:
> > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> > >1、从savepoint恢复;
> > >2、作业开始定期做savepoint;
> > >3、作业failover。
> > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> > >如果还是有问题,需要通过日志来排查了。
> > >
> > >董建 <[hidden email]> 于2021年5月28日周五 下午5:37写道:
> > >
> > >> 我遇到的问题现象是这样的
> > >>
> > >>
> > >>
> > >>
> > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> > >>
> > >>
> > >>
> > >>
> > >> flink run -d -s
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx
> > >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> > >>
> > >>
> > >>
> > >>
> > >> 2、flink-conf.xml
> > >>
> > >>
> > >>
> > >>
> > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> > >>
> > >>
> > >>
> > >>
> > >> 3、代码checkpoint设置
> > >>
> > >>
> > >>
> > >>
> > >>    StreamExecutionEnvironment env =
> > >> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>
> > >>
> > >>
> > >>
> > >>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> > >> 10));
> > >>
> > >>
> > >>
> > >>
> > >>        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >>
> > >>
> > >>
> > >>
> > >>        env.enableCheckpointing(1 * 60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >>
> > >>
> > >>
> > >>
> > >>        checkpointConfig.setTolerableCheckpointFailureNumber(100);
> > >>
> > >>
> > >>
> > >>
> > >>        checkpointConfig.setCheckpointTimeout(60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>        checkpointConfig.setMaxConcurrentCheckpoints(1);
> > >>
> > >>
> > >>
> > >>
> > >> 4、问题现象
> > >>
> > >>
> > >>
> > >>
> > >> a)运维同事切换yarn
> > >>
> >
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> > >>
> > >>
> > >>
> > >>
> > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> > >> restore,从日志中看还是从chk-100 restore的。
> > >>
> > >>
> > >>
> > >>
> > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String>
> > >> sourceMilApplysLogStream = MySQLSource.<String>builder()
> > >>
> > >>
> > >>
> > >>
> > >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> > >>
> > >>
> > >>
> > >>
> > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> > >>
> > >>
> > >>
> > >>
> > >> 2021-05-24 16:49:50,398 INFO
> > >> org.apache.flink.configuration.GlobalConfiguration           [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >>
> > >>
> > >>
> > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> > >>
> > >>
> > >>
> > >>
> > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
> >
>