Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

LakeShen
大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的
Checkpoint 状态做清理。

当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
Checkpoint 超时失败。

但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的
Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?

希望有大佬能帮我解惑,非常感谢
Reply | Threaded
Open this post in threaded view
|

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

Yun Tang
Hi

如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
因此,加载的checkpoint被赋予了savepoint的property [2]。 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。

希望这些解释能解答你的困惑

[1] https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
[2] https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214

祝好
唐云




________________________________
From: LakeShen <[hidden email]>
Sent: Friday, January 17, 2020 16:28
To: [hidden email] <[hidden email]>
Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的
Checkpoint 状态做清理。

当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
Checkpoint 超时失败。

但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的
Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?

希望有大佬能帮我解惑,非常感谢
Reply | Threaded
Open this post in threaded view
|

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

LakeShen
Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
Savepoint 类似,如果不清理,就永久保留。
非常感谢


Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道:

> Hi
>
> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
> 因此,加载的checkpoint被赋予了savepoint的property [2]。
> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>
> 希望这些解释能解答你的困惑
>
> [1]
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
> [2]
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>
> 祝好
> 唐云
>
>
>
>
> ________________________________
> From: LakeShen <[hidden email]>
> Sent: Friday, January 17, 2020 16:28
> To: [hidden email] <[hidden email]>
> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>
> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
> 我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的
> Checkpoint 状态做清理。
>
> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
> Checkpoint 超时失败。
>
> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的
> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>
> 希望有大佬能帮我解惑,非常感谢
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

LakeShen
是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。

LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道:

> Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
> Savepoint 类似,如果不清理,就永久保留。
> 非常感谢
>
>
> Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道:
>
>> Hi
>>
>> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
>> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
>> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
>> 因此,加载的checkpoint被赋予了savepoint的property [2]。
>> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
>> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>>
>> 希望这些解释能解答你的困惑
>>
>> [1]
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
>> [2]
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>>
>> 祝好
>> 唐云
>>
>>
>>
>>
>> ________________________________
>> From: LakeShen <[hidden email]>
>> Sent: Friday, January 17, 2020 16:28
>> To: [hidden email] <[hidden email]>
>> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>>
>> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
>> 我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的
>> Checkpoint 状态做清理。
>>
>> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
>> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
>> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
>> Checkpoint 超时失败。
>>
>> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的
>> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
>> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>>
>> 希望有大佬能帮我解惑,非常感谢
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

Yun Tang
Hi

目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk- 开头的目录。

祝好
唐云

________________________________
From: LakeShen <[hidden email]>
Sent: Sunday, January 19, 2020 15:42
To: [hidden email] <[hidden email]>
Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。

LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道:

> Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
> Savepoint 类似,如果不清理,就永久保留。
> 非常感谢
>
>
> Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道:
>
>> Hi
>>
>> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
>> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
>> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
>> 因此,加载的checkpoint被赋予了savepoint的property [2]。
>> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
>> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>>
>> 希望这些解释能解答你的困惑
>>
>> [1]
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
>> [2]
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>>
>> 祝好
>> 唐云
>>
>>
>>
>>
>> ________________________________
>> From: LakeShen <[hidden email]>
>> Sent: Friday, January 17, 2020 16:28
>> To: [hidden email] <[hidden email]>
>> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>>
>> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
>> 我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的
>> Checkpoint 状态做清理。
>>
>> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
>> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
>> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
>> Checkpoint 超时失败。
>>
>> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的
>> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
>> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>>
>> 希望有大佬能帮我解惑,非常感谢
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

LakeShen
非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录:
1. chk-id 的目录
2. shared 目录,其中状态非常大
3. taskowned

我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到
chk-id目录。
如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录,
占用的存储不大。下面是相关文件目录的大小:
1.3 M    hdfs:xxx/chk-94794
1.1 T    hdfs:xxx/shared
0        hdfs:xxx/taskowned
如果有什么理解错误,请指出,非常感谢。

祝好,
沈磊

Yun Tang <[hidden email]> 于2020年1月19日周日 下午4:11写道:

> Hi
>
> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk-
> 开头的目录。
>
> 祝好
> 唐云
>
> ________________________________
> From: LakeShen <[hidden email]>
> Sent: Sunday, January 19, 2020 15:42
> To: [hidden email] <[hidden email]>
> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>
> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
>
> LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道:
>
> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
> > Savepoint 类似,如果不清理,就永久保留。
> > 非常感谢
> >
> >
> > Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道:
> >
> >> Hi
> >>
> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。
> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
> >>
> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
> >>
> >> 希望这些解释能解答你的困惑
> >>
> >> [1]
> >>
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
> >> [2]
> >>
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
> >>
> >> 祝好
> >> 唐云
> >>
> >>
> >>
> >>
> >> ________________________________
> >> From: LakeShen <[hidden email]>
> >> Sent: Friday, January 17, 2020 16:28
> >> To: [hidden email] <[hidden email]>
> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
> >>
> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
> >> 我看了一下源码,发现当完成的 Checkpoint 数大于
> state.checkpoints.num-retained的数值时,会对之前的完成的
> >> Checkpoint 状态做清理。
> >>
> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
> >> Checkpoint 超时失败。
> >>
> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是
> state.checkpoints.num-retained又为1,完成的
> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
> >>
> >> 希望有大佬能帮我解惑,非常感谢
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

LakeShen
另外,我的容错恢复是 Flink 自身行为 Checkpoint 的容错恢复,我看 CheckpointCoordinator 有两个方法:
restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢?

LakeShen <[hidden email]> 于2020年1月19日周日 下午4:30写道:

> 非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录:
> 1. chk-id 的目录
> 2. shared 目录,其中状态非常大
> 3. taskowned
>
> 我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到
> chk-id目录。
> 如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录,
> 占用的存储不大。下面是相关文件目录的大小:
> 1.3 M    hdfs:xxx/chk-94794
> 1.1 T    hdfs:xxx/shared
> 0        hdfs:xxx/taskowned
> 如果有什么理解错误,请指出,非常感谢。
>
> 祝好,
> 沈磊
>
> Yun Tang <[hidden email]> 于2020年1月19日周日 下午4:11写道:
>
>> Hi
>>
>> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
>> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
>> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk-
>> 开头的目录。
>>
>> 祝好
>> 唐云
>>
>> ________________________________
>> From: LakeShen <[hidden email]>
>> Sent: Sunday, January 19, 2020 15:42
>> To: [hidden email] <[hidden email]>
>> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>>
>> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
>>
>> LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道:
>>
>> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
>> > Savepoint 类似,如果不清理,就永久保留。
>> > 非常感谢
>> >
>> >
>> > Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道:
>> >
>> >> Hi
>> >>
>> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
>> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
>> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
>> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。
>> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
>> >>
>> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>> >>
>> >> 希望这些解释能解答你的困惑
>> >>
>> >> [1]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
>> >> [2]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>> >>
>> >> 祝好
>> >> 唐云
>> >>
>> >>
>> >>
>> >>
>> >> ________________________________
>> >> From: LakeShen <[hidden email]>
>> >> Sent: Friday, January 17, 2020 16:28
>> >> To: [hidden email] <[hidden email]>
>> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>> >>
>> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
>> >> 我看了一下源码,发现当完成的 Checkpoint 数大于
>> state.checkpoints.num-retained的数值时,会对之前的完成的
>> >> Checkpoint 状态做清理。
>> >>
>> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
>> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
>> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
>> >> Checkpoint 超时失败。
>> >>
>> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是
>> state.checkpoints.num-retained又为1,完成的
>> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
>> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>> >>
>> >> 希望有大佬能帮我解惑,非常感谢
>> >>
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

Yun Tang
Hi

如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?
chk- 开头的目录,描述的是对应checkpoint独享的state,包括 operator state,rocksDB的非增量文件(例如rocksDB的VERSION等非SST文件)等等,所以只要新的checkpoint完成了,这些目录可以安全删除。

CheckpointCoordinator 有两个方法:
restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢
restoreLatestCheckpointedState 主要是给failover用,从checkpoint store获取last completed checkpoint进行恢复。
restoreSavepoint 是给作业启动时候用,从用户传入的Savepoint加载需要的state进行恢复。

祝好
唐云


________________________________
From: LakeShen <[hidden email]>
Sent: Sunday, January 19, 2020 16:48
To: [hidden email] <[hidden email]>
Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

另外,我的容错恢复是 Flink 自身行为 Checkpoint 的容错恢复,我看 CheckpointCoordinator 有两个方法:
restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢?

LakeShen <[hidden email]> 于2020年1月19日周日 下午4:30写道:

> 非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录:
> 1. chk-id 的目录
> 2. shared 目录,其中状态非常大
> 3. taskowned
>
> 我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到
> chk-id目录。
> 如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录,
> 占用的存储不大。下面是相关文件目录的大小:
> 1.3 M    hdfs:xxx/chk-94794
> 1.1 T    hdfs:xxx/shared
> 0        hdfs:xxx/taskowned
> 如果有什么理解错误,请指出,非常感谢。
>
> 祝好,
> 沈磊
>
> Yun Tang <[hidden email]> 于2020年1月19日周日 下午4:11写道:
>
>> Hi
>>
>> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
>> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
>> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk-
>> 开头的目录。
>>
>> 祝好
>> 唐云
>>
>> ________________________________
>> From: LakeShen <[hidden email]>
>> Sent: Sunday, January 19, 2020 15:42
>> To: [hidden email] <[hidden email]>
>> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>>
>> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
>>
>> LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道:
>>
>> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
>> > Savepoint 类似,如果不清理,就永久保留。
>> > 非常感谢
>> >
>> >
>> > Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道:
>> >
>> >> Hi
>> >>
>> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
>> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
>> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
>> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。
>> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
>> >>
>> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>> >>
>> >> 希望这些解释能解答你的困惑
>> >>
>> >> [1]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
>> >> [2]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>> >>
>> >> 祝好
>> >> 唐云
>> >>
>> >>
>> >>
>> >>
>> >> ________________________________
>> >> From: LakeShen <[hidden email]>
>> >> Sent: Friday, January 17, 2020 16:28
>> >> To: [hidden email] <[hidden email]>
>> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>> >>
>> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
>> >> 我看了一下源码,发现当完成的 Checkpoint 数大于
>> state.checkpoints.num-retained的数值时,会对之前的完成的
>> >> Checkpoint 状态做清理。
>> >>
>> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
>> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
>> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
>> >> Checkpoint 超时失败。
>> >>
>> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是
>> state.checkpoints.num-retained又为1,完成的
>> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
>> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>> >>
>> >> 希望有大佬能帮我解惑,非常感谢
>> >>
>> >
>>
>