请教个Flink checkpoint的问题

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

请教个Flink checkpoint的问题

占英华
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?

Reply | Threaded
Open this post in threaded view
|

回复: 请教个Flink checkpoint的问题

占英华
刚才代码截图没发出去,再贴下代码
streamEnv.enableCheckpointing(5 * 60 * 1000);
CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
checkPointConfig.setMaxConcurrentCheckpoints(1);
checkPointConfig.setTolerableCheckpointFailureNumber(3);
checkPointConfig
        .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);



[hidden email]
 
发件人: [hidden email]
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?


[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: 请教个Flink checkpoint的问题

Evan
In reply to this post by 占英华
代码图挂掉了,看不到代码



 
发件人: [hidden email]
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?


[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 请教个Flink checkpoint的问题

占英华
In reply to this post by 占英华
代码如下:
    streamEnv.enableCheckpointing(5 * 60 * 1000);
    CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
    checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
    checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
    checkPointConfig.setMaxConcurrentCheckpoints(1);
    checkPointConfig.setTolerableCheckpointFailureNumber(3);
    checkPointConfig
            .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
    try {
      StateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointPath);
      streamEnv.setStateBackend(rocksDBStateBackend);


 
发件人: [hidden email]
发送时间: 2021-01-14 17:55
收件人: [hidden email]
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
 
 
 
发件人: [hidden email]
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
 
 
Reply | Threaded
Open this post in threaded view
|

Re: 请教个Flink checkpoint的问题

tison
In reply to this post by 占英华
没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。

Best,
tison.


Evan <[hidden email]> 于2021年1月14日周四 下午5:56写道:

> 代码图挂掉了,看不到代码
>
>
>
>
> 发件人: [hidden email]
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: 请教个Flink checkpoint的问题

占英华
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0
log4j:WARN No such property [datePattern] in org.apache.log4j.RollingFileAppender.
21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop library
Found 1 items
-rw-rw-r--   3 yarn hdfs       5388 2021-01-14 17:03 hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata   // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6  //我停止任务后再去查询时,这个目录已经删除了,出错如下
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0
log4j:WARN No such property [datePattern] in org.apache.log4j.RollingFileAppender.
21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
ls: `hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6': No such file or directory    //出错信息,checkpoint信息被删除了,这个目录我是专门用来存放checkpoint信息,排除其他主动删除该文件的可能





[hidden email]
 
发件人: tison
发送时间: 2021-01-14 18:04
收件人: user-zh
主题: Re: 请教个Flink checkpoint的问题
没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。
 
Best,
tison.
 
 
Evan <[hidden email]> 于2021年1月14日周四 下午5:56写道:
 

> 代码图挂掉了,看不到代码
>
>
>
>
> 发件人: [hidden email]
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: 请教个Flink checkpoint的问题

占英华
In reply to this post by tison
刚才又操作了一次,我重新截图了放在附件里了,
开始在18:29:29时没有看到chk-8生成,就是在18:29:29时checkpoint没有生成,
然后18:29:34查看时,checkpoint生成了
然后18:29:51查看时,checkpoint还在,此时我停止了那个任务
18:30:11去查看时,checkpoint的chk-8不见了






在 2021-01-14 18:04:27,"tison" <[hidden email]> 写道: >没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。 > >Best, >tison. > > >Evan <[hidden email]> 于2021年1月14日周四 下午5:56写道: > >> 代码图挂掉了,看不到代码 >> >> >> >> >> 发件人: [hidden email] >> 发送时间: 2021-01-14 17:26 >> 收件人: user-zh >> 主题: 请教个Flink checkpoint的问题 >> >> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下 >> >> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息? >> >> >> [hidden email] >>


 

Reply | Threaded
Open this post in threaded view
|

Re: Re: 请教个Flink checkpoint的问题

nobleyd
In reply to this post by 占英华
机制就是这样的。如下是我之前做过的测试。
启动后等待若干检查点之后做如下操作文件系统上的检查点是否保留说明
WEB UI 点击 Cancel 方式取消任务 保留 合理,因为设置了 RETAIN_ON_CANCELLATION。
通过命令生成保存点:flink savepoint ${jobId} ${savepointDir} 保留 OK
通过命令取消任务:flink cancel ${jobId} 保留 OK
通过命令取消任务并生成保存点:flink cancel -s ${savepointDir} ${jobId} 保留 OK
通过命令停止任务(基于默认保存点目录):flink stop ${jobId} *不*保留 *注意别被特点坑*
通过命令停止任务并生成保存点:flink stop -p ${savepointDir} ${jobId} *不*保留 *注意别被特点坑 *

[hidden email] <[hidden email]> 于2021年1月14日周四 下午6:23写道:

> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> Found 1 items
> -rw-rw-r--   3 yarn hdfs       5388 2021-01-14 17:03
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
>  // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> //我停止任务后再去查询时,这个目录已经删除了,出错如下
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> ls:
> `hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
> No such file or directory
> //出错信息,checkpoint信息被删除了,这个目录我是专门用来存放checkpoint信息,排除其他主动删除该文件的可能
>
>
>
>
>
> [hidden email]
>
> 发件人: tison
> 发送时间: 2021-01-14 18:04
> 收件人: user-zh
> 主题: Re: 请教个Flink checkpoint的问题
> 没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。
>
> Best,
> tison.
>
>
> Evan <[hidden email]> 于2021年1月14日周四 下午5:56写道:
>
> > 代码图挂掉了,看不到代码
> >
> >
> >
> >
> > 发件人: [hidden email]
> > 发送时间: 2021-01-14 17:26
> > 收件人: user-zh
> > 主题: 请教个Flink checkpoint的问题
> >
> >
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
> >
> >
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
> >
> >
> > [hidden email]
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: 请教个Flink checkpoint的问题

占英华
感谢您的答复!



[hidden email]
 
发件人: 赵一旦
发送时间: 2021-01-14 18:43
收件人: user-zh
主题: Re: Re: 请教个Flink checkpoint的问题
机制就是这样的。如下是我之前做过的测试。
启动后等待若干检查点之后做如下操作文件系统上的检查点是否保留说明
WEB UI 点击 Cancel 方式取消任务 保留 合理,因为设置了 RETAIN_ON_CANCELLATION。
通过命令生成保存点:flink savepoint ${jobId} ${savepointDir} 保留 OK
通过命令取消任务:flink cancel ${jobId} 保留 OK
通过命令取消任务并生成保存点:flink cancel -s ${savepointDir} ${jobId} 保留 OK
通过命令停止任务(基于默认保存点目录):flink stop ${jobId} *不*保留 *注意别被特点坑*
通过命令停止任务并生成保存点:flink stop -p ${savepointDir} ${jobId} *不*保留 *注意别被特点坑 *
 
[hidden email] <[hidden email]> 于2021年1月14日周四 下午6:23写道:
 

> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> Found 1 items
> -rw-rw-r--   3 yarn hdfs       5388 2021-01-14 17:03
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
>  // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> //我停止任务后再去查询时,这个目录已经删除了,出错如下
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> ls:
> `hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
> No such file or directory
> //出错信息,checkpoint信息被删除了,这个目录我是专门用来存放checkpoint信息,排除其他主动删除该文件的可能
>
>
>
>
>
> [hidden email]
>
> 发件人: tison
> 发送时间: 2021-01-14 18:04
> 收件人: user-zh
> 主题: Re: 请教个Flink checkpoint的问题
> 没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。
>
> Best,
> tison.
>
>
> Evan <[hidden email]> 于2021年1月14日周四 下午5:56写道:
>
> > 代码图挂掉了,看不到代码
> >
> >
> >
> >
> > 发件人: [hidden email]
> > 发送时间: 2021-01-14 17:26
> > 收件人: user-zh
> > 主题: 请教个Flink checkpoint的问题
> >
> >
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
> >
> >
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
> >
> >
> > [hidden email]
> >
>
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 请教个Flink checkpoint的问题

Evan
In reply to this post by 占英华
是的,应该是机制问题,链接[1]打开有这样一句解释:

 If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED).

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention 

如回答有误,请指正。





 
发件人: [hidden email]
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
    streamEnv.enableCheckpointing(5 * 60 * 1000);
    CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
    checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
    checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
    checkPointConfig.setMaxConcurrentCheckpoints(1);
    checkPointConfig.setTolerableCheckpointFailureNumber(3);
    checkPointConfig
            .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
    try {
      StateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointPath);
      streamEnv.setStateBackend(rocksDBStateBackend);



[hidden email]
 
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
 
 
 
发件人: [hidden email]
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
 
 
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 回复: 请教个Flink checkpoint的问题

nobleyd
In reply to this post by 占英华
Evan说的这个是一个设置,但也仅影响cancel那个命令,stop还是会删除。这个点其实做的不是很好,不清楚为啥,之前Q过,没人鸟。。。
所以按照我的经验,如果是需要停止并基于保存点重启,那还好。如果计划基于检查点重启,无比提前备份检查点,然后停任务,然后复制备份回去。
在或者,直接cancel,不用stop。

Evan <[hidden email]> 于2021年1月14日周四 下午6:49写道:

> 是的,应该是机制问题,链接[1]打开有这样一句解释:
>
>  If you choose to retain externalized checkpoints on cancellation you have
> to handle checkpoint clean up manually when you cancel the job as well
> (terminating with job status JobStatus#CANCELED).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
>
> 如回答有误,请指正。
>
>
>
>
>
>
> 发件人: [hidden email]
> 发送时间: 2021-01-14 18:02
> 收件人: user-zh
> 主题: 回复: 回复: 请教个Flink checkpoint的问题
> 代码如下:
>     streamEnv.enableCheckpointing(5 * 60 * 1000);
>     CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
>     checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>     checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
>     checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
>     checkPointConfig.setMaxConcurrentCheckpoints(1);
>     checkPointConfig.setTolerableCheckpointFailureNumber(3);
>     checkPointConfig
>
> .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>     String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
>     try {
>       StateBackend rocksDBStateBackend = new
> RocksDBStateBackend(checkpointPath);
>       streamEnv.setStateBackend(rocksDBStateBackend);
>
>
>
> [hidden email]
>
> 发件人: Evan
> 发送时间: 2021-01-14 17:55
> 收件人: user-zh
> 主题: 回复: 请教个Flink checkpoint的问题
> 代码图挂掉了,看不到代码
>
>
>
> 发件人: [hidden email]
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 请教个Flink checkpoint的问题

占英华
In reply to this post by 占英华
好的,感谢您的回复!



[hidden email]
 
发件人: Evan
发送时间: 2021-01-14 18:48
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
是的,应该是机制问题,链接[1]打开有这样一句解释:
 
If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED).
 
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention 
 
如回答有误,请指正。
 
 
 
 
 
发件人: [hidden email]
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
    streamEnv.enableCheckpointing(5 * 60 * 1000);
    CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
    checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
    checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
    checkPointConfig.setMaxConcurrentCheckpoints(1);
    checkPointConfig.setTolerableCheckpointFailureNumber(3);
    checkPointConfig
            .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
    String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
    try {
      StateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointPath);
      streamEnv.setStateBackend(rocksDBStateBackend);
 
 
 
[hidden email]
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
发件人: [hidden email]
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 回复: 请教个Flink checkpoint的问题

Yun Tang
Hi

这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain checkpoint的数量为1而被subsume掉了,也就是被删掉了。

如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。

另外说一句,即使是已经deprecated的cancel with savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。


[1] https://issues.apache.org/jira/browse/FLINK-10354
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained

祝好
唐云
________________________________
From: [hidden email] <[hidden email]>
Sent: Thursday, January 14, 2021 19:00
To: user-zh <[hidden email]>
Subject: 回复: 回复: 请教个Flink checkpoint的问题

好的,感谢您的回复!



[hidden email]

发件人: Evan
发送时间: 2021-01-14 18:48
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
是的,应该是机制问题,链接[1]打开有这样一句解释:

If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED).

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention

如回答有误,请指正。





发件人: [hidden email]
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
    streamEnv.enableCheckpointing(5 * 60 * 1000);
    CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
    checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
    checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
    checkPointConfig.setMaxConcurrentCheckpoints(1);
    checkPointConfig.setTolerableCheckpointFailureNumber(3);
    checkPointConfig
            .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
    try {
      StateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointPath);
      streamEnv.setStateBackend(rocksDBStateBackend);



[hidden email]
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
发件人: [hidden email]
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: Re: 请教个Flink checkpoint的问题

占英华
感谢您的答复,今天我使用cancelWithSavepoint时也还是删除了,通过你的答复知道机制如此,我再使用另外的参数来观察下。



[hidden email]
 
发件人: Yun Tang
发送时间: 2021-01-15 11:02
收件人: user-zh
主题: Re: 回复: 请教个Flink checkpoint的问题
Hi
 
这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain checkpoint的数量为1而被subsume掉了,也就是被删掉了。
 
如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。
 
另外说一句,即使是已经deprecated的cancel with savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。
 
 
[1] https://issues.apache.org/jira/browse/FLINK-10354
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
 
祝好
唐云
________________________________
From: [hidden email] <[hidden email]>
Sent: Thursday, January 14, 2021 19:00
To: user-zh <[hidden email]>
Subject: 回复: 回复: 请教个Flink checkpoint的问题
 
好的,感谢您的回复!
 
 
 
[hidden email]
 
发件人: Evan
发送时间: 2021-01-14 18:48
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
是的,应该是机制问题,链接[1]打开有这样一句解释:
 
If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED).
 
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
 
如回答有误,请指正。
 
 
 
 
 
发件人: [hidden email]
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
    streamEnv.enableCheckpointing(5 * 60 * 1000);
    CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
    checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
    checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
    checkPointConfig.setMaxConcurrentCheckpoints(1);
    checkPointConfig.setTolerableCheckpointFailureNumber(3);
    checkPointConfig
            .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
    String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
    try {
      StateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointPath);
      streamEnv.setStateBackend(rocksDBStateBackend);
 
 
 
[hidden email]
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
发件人: [hidden email]
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: 请教个Flink checkpoint的问题

Congxian Qiu
Hi
    你可以看一下是不是 作业的状态变成了 FINISH 了,现在 Flink 在 FINISH 的时候,会删除 checkpoint(就算设置了
retain on cancel 也会删除)
    PS :FLINK-18263 正在讨论是否在 Job 变为 FINISH 状态后保留 checkpoint
[1] https://issues.apache.org/jira/browse/FLINK-18263
Best,
Congxian


[hidden email] <[hidden email]> 于2021年1月15日周五 上午11:23写道:

> 感谢您的答复,今天我使用cancelWithSavepoint时也还是删除了,通过你的答复知道机制如此,我再使用另外的参数来观察下。
>
>
>
> [hidden email]
>
> 发件人: Yun Tang
> 发送时间: 2021-01-15 11:02
> 收件人: user-zh
> 主题: Re: 回复: 请教个Flink checkpoint的问题
> Hi
>
> 这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with
> savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain
> checkpoint的数量为1而被subsume掉了,也就是被删掉了。
>
> 如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。
>
> 另外说一句,即使是已经deprecated的cancel with
> savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10354
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
>
> 祝好
> 唐云
> ________________________________
> From: [hidden email] <[hidden email]>
> Sent: Thursday, January 14, 2021 19:00
> To: user-zh <[hidden email]>
> Subject: 回复: 回复: 请教个Flink checkpoint的问题
>
> 好的,感谢您的回复!
>
>
>
> [hidden email]
>
> 发件人: Evan
> 发送时间: 2021-01-14 18:48
> 收件人: user-zh
> 主题: 回复: 回复: 请教个Flink checkpoint的问题
> 是的,应该是机制问题,链接[1]打开有这样一句解释:
>
> If you choose to retain externalized checkpoints on cancellation you have
> to handle checkpoint clean up manually when you cancel the job as well
> (terminating with job status JobStatus#CANCELED).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
>
> 如回答有误,请指正。
>
>
>
>
>
> 发件人: [hidden email]
> 发送时间: 2021-01-14 18:02
> 收件人: user-zh
> 主题: 回复: 回复: 请教个Flink checkpoint的问题
> 代码如下:
>     streamEnv.enableCheckpointing(5 * 60 * 1000);
>     CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
>     checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>     checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
>     checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
>     checkPointConfig.setMaxConcurrentCheckpoints(1);
>     checkPointConfig.setTolerableCheckpointFailureNumber(3);
>     checkPointConfig
>
> .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>     String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
>     try {
>       StateBackend rocksDBStateBackend = new
> RocksDBStateBackend(checkpointPath);
>       streamEnv.setStateBackend(rocksDBStateBackend);
>
>
>
> [hidden email]
> 发件人: Evan
> 发送时间: 2021-01-14 17:55
> 收件人: user-zh
> 主题: 回复: 请教个Flink checkpoint的问题
> 代码图挂掉了,看不到代码
> 发件人: [hidden email]
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: 请教个Flink checkpoint的问题

占英华
1:先前测试过使用stopWithSavepoint时会将之前成功保存的checkpoint数据给删除掉,后来我们查看了下源码,里面描述如下,就是调用该方法时Flink会将程序设置成Finished态的,可能和实际使用场景有出入。
/**
 * Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
 * Stopping works only for streaming programs. Be aware, that the program might continue to run for
 * a while after sending the stop command, because after sources stopped to emit data all operators
 * need to finish processing.
 *
 * @param jobId the job ID of the streaming program to stop
 * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
 * @param savepointDirectory directory the savepoint should be written to
 * @return a {@link CompletableFuture} containing the path where the savepoint is located
 */

CompletableFuture<String> stopWithSavepoint(final JobID jobId, final boolean advanceToEndOfEventTime, @Nullable final String savepointDirectory);

















在 2021-01-17 16:48:22,"Congxian Qiu" <[hidden email]> 写道:

>Hi
>    你可以看一下是不是 作业的状态变成了 FINISH 了,现在 Flink 在 FINISH 的时候,会删除 checkpoint(就算设置了
>retain on cancel 也会删除)
>    PS :FLINK-18263 正在讨论是否在 Job 变为 FINISH 状态后保留 checkpoint
>[1] https://issues.apache.org/jira/browse/FLINK-18263
>Best,
>Congxian
>
>
>[hidden email] <[hidden email]> 于2021年1月15日周五 上午11:23写道:
>
>> 感谢您的答复,今天我使用cancelWithSavepoint时也还是删除了,通过你的答复知道机制如此,我再使用另外的参数来观察下。
>>
>>
>>
>> [hidden email]
>>
>> 发件人: Yun Tang
>> 发送时间: 2021-01-15 11:02
>> 收件人: user-zh
>> 主题: Re: 回复: 请教个Flink checkpoint的问题
>> Hi
>>
>> 这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with
>> savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain
>> checkpoint的数量为1而被subsume掉了,也就是被删掉了。
>>
>> 如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。
>>
>> 另外说一句,即使是已经deprecated的cancel with
>> savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10354
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
>>
>> 祝好
>> 唐云
>> ________________________________
>> From: [hidden email] <[hidden email]>
>> Sent: Thursday, January 14, 2021 19:00
>> To: user-zh <[hidden email]>
>> Subject: 回复: 回复: 请教个Flink checkpoint的问题
>>
>> 好的,感谢您的回复!
>>
>>
>>
>> [hidden email]
>>
>> 发件人: Evan
>> 发送时间: 2021-01-14 18:48
>> 收件人: user-zh
>> 主题: 回复: 回复: 请教个Flink checkpoint的问题
>> 是的,应该是机制问题,链接[1]打开有这样一句解释:
>>
>> If you choose to retain externalized checkpoints on cancellation you have
>> to handle checkpoint clean up manually when you cancel the job as well
>> (terminating with job status JobStatus#CANCELED).
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
>>
>> 如回答有误,请指正。
>>
>>
>>
>>
>>
>> 发件人: [hidden email]
>> 发送时间: 2021-01-14 18:02
>> 收件人: user-zh
>> 主题: 回复: 回复: 请教个Flink checkpoint的问题
>> 代码如下:
>>     streamEnv.enableCheckpointing(5 * 60 * 1000);
>>     CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
>>     checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>     checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
>>     checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
>>     checkPointConfig.setMaxConcurrentCheckpoints(1);
>>     checkPointConfig.setTolerableCheckpointFailureNumber(3);
>>     checkPointConfig
>>
>> .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>>     String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
>>     try {
>>       StateBackend rocksDBStateBackend = new
>> RocksDBStateBackend(checkpointPath);
>>       streamEnv.setStateBackend(rocksDBStateBackend);
>>
>>
>>
>> [hidden email]
>> 发件人: Evan
>> 发送时间: 2021-01-14 17:55
>> 收件人: user-zh
>> 主题: 回复: 请教个Flink checkpoint的问题
>> 代码图挂掉了,看不到代码
>> 发件人: [hidden email]
>> 发送时间: 2021-01-14 17:26
>> 收件人: user-zh
>> 主题: 请教个Flink checkpoint的问题
>>
>> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>>
>> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>> [hidden email]
>>
Reply | Threaded
Open this post in threaded view
|

请教个Flink checkpoint的问题

gimlee
In reply to this post by 占英华
设置下state.checkpoints.num-retained这个值,像是数量超了自动删除



--
Sent from: http://apache-flink.147419.n8.nabble.com/