大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的 Checkpoint 状态做清理。 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从 chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务 Checkpoint 超时失败。 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的 Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗? 希望有大佬能帮我解惑,非常感谢 |
Hi
如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 因此,加载的checkpoint被赋予了savepoint的property [2]。 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。 希望这些解释能解答你的困惑 [1] https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141 [2] https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214 祝好 唐云 ________________________________ From: LakeShen <[hidden email]> Sent: Friday, January 17, 2020 16:28 To: [hidden email] <[hidden email]> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1, 我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的 Checkpoint 状态做清理。 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从 chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务 Checkpoint 超时失败。 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的 Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗? 希望有大佬能帮我解惑,非常感谢 |
Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
Savepoint 类似,如果不清理,就永久保留。 非常感谢 Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道: > Hi > > 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by > design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 > [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 > 因此,加载的checkpoint被赋予了savepoint的property [2]。 > 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed > 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。 > > 希望这些解释能解答你的困惑 > > [1] > https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141 > [2] > https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214 > > 祝好 > 唐云 > > > > > ________________________________ > From: LakeShen <[hidden email]> > Sent: Friday, January 17, 2020 16:28 > To: [hidden email] <[hidden email]> > Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 > > 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1, > 我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的 > Checkpoint 状态做清理。 > > 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为 > 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从 > chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务 > Checkpoint 超时失败。 > > 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的 > Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS > 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗? > > 希望有大佬能帮我解惑,非常感谢 > |
是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道: > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和 > Savepoint 类似,如果不清理,就永久保留。 > 非常感谢 > > > Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道: > >> Hi >> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 >> 因此,加载的checkpoint被赋予了savepoint的property [2]。 >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed >> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。 >> >> 希望这些解释能解答你的困惑 >> >> [1] >> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141 >> [2] >> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214 >> >> 祝好 >> 唐云 >> >> >> >> >> ________________________________ >> From: LakeShen <[hidden email]> >> Sent: Friday, January 17, 2020 16:28 >> To: [hidden email] <[hidden email]> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 >> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1, >> 我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的 >> Checkpoint 状态做清理。 >> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为 >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从 >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务 >> Checkpoint 超时失败。 >> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的 >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗? >> >> 希望有大佬能帮我解惑,非常感谢 >> > |
Hi
目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk- 开头的目录。 祝好 唐云 ________________________________ From: LakeShen <[hidden email]> Sent: Sunday, January 19, 2020 15:42 To: [hidden email] <[hidden email]> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。 LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道: > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和 > Savepoint 类似,如果不清理,就永久保留。 > 非常感谢 > > > Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道: > >> Hi >> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 >> 因此,加载的checkpoint被赋予了savepoint的property [2]。 >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed >> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。 >> >> 希望这些解释能解答你的困惑 >> >> [1] >> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141 >> [2] >> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214 >> >> 祝好 >> 唐云 >> >> >> >> >> ________________________________ >> From: LakeShen <[hidden email]> >> Sent: Friday, January 17, 2020 16:28 >> To: [hidden email] <[hidden email]> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 >> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1, >> 我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的 >> Checkpoint 状态做清理。 >> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为 >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从 >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务 >> Checkpoint 超时失败。 >> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的 >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗? >> >> 希望有大佬能帮我解惑,非常感谢 >> > |
非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录:
1. chk-id 的目录 2. shared 目录,其中状态非常大 3. taskowned 我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到 chk-id目录。 如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录, 占用的存储不大。下面是相关文件目录的大小: 1.3 M hdfs:xxx/chk-94794 1.1 T hdfs:xxx/shared 0 hdfs:xxx/taskowned 如果有什么理解错误,请指出,非常感谢。 祝好, 沈磊 Yun Tang <[hidden email]> 于2020年1月19日周日 下午4:11写道: > Hi > > 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。 > 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。 > 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk- > 开头的目录。 > > 祝好 > 唐云 > > ________________________________ > From: LakeShen <[hidden email]> > Sent: Sunday, January 19, 2020 15:42 > To: [hidden email] <[hidden email]> > Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 > > 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。 > > LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道: > > > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和 > > Savepoint 类似,如果不清理,就永久保留。 > > 非常感谢 > > > > > > Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道: > > > >> Hi > >> > >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by > >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 > >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 > >> 因此,加载的checkpoint被赋予了savepoint的property [2]。 > >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed > >> > 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。 > >> > >> 希望这些解释能解答你的困惑 > >> > >> [1] > >> > https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141 > >> [2] > >> > https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214 > >> > >> 祝好 > >> 唐云 > >> > >> > >> > >> > >> ________________________________ > >> From: LakeShen <[hidden email]> > >> Sent: Friday, January 17, 2020 16:28 > >> To: [hidden email] <[hidden email]> > >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 > >> > >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1, > >> 我看了一下源码,发现当完成的 Checkpoint 数大于 > state.checkpoints.num-retained的数值时,会对之前的完成的 > >> Checkpoint 状态做清理。 > >> > >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为 > >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从 > >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务 > >> Checkpoint 超时失败。 > >> > >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 > state.checkpoints.num-retained又为1,完成的 > >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS > >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗? > >> > >> 希望有大佬能帮我解惑,非常感谢 > >> > > > |
另外,我的容错恢复是 Flink 自身行为 Checkpoint 的容错恢复,我看 CheckpointCoordinator 有两个方法:
restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢? LakeShen <[hidden email]> 于2020年1月19日周日 下午4:30写道: > 非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录: > 1. chk-id 的目录 > 2. shared 目录,其中状态非常大 > 3. taskowned > > 我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到 > chk-id目录。 > 如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录, > 占用的存储不大。下面是相关文件目录的大小: > 1.3 M hdfs:xxx/chk-94794 > 1.1 T hdfs:xxx/shared > 0 hdfs:xxx/taskowned > 如果有什么理解错误,请指出,非常感谢。 > > 祝好, > 沈磊 > > Yun Tang <[hidden email]> 于2020年1月19日周日 下午4:11写道: > >> Hi >> >> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。 >> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。 >> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk- >> 开头的目录。 >> >> 祝好 >> 唐云 >> >> ________________________________ >> From: LakeShen <[hidden email]> >> Sent: Sunday, January 19, 2020 15:42 >> To: [hidden email] <[hidden email]> >> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 >> >> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。 >> >> LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道: >> >> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和 >> > Savepoint 类似,如果不清理,就永久保留。 >> > 非常感谢 >> > >> > >> > Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道: >> > >> >> Hi >> >> >> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by >> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 >> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 >> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。 >> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed >> >> >> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。 >> >> >> >> 希望这些解释能解答你的困惑 >> >> >> >> [1] >> >> >> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141 >> >> [2] >> >> >> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214 >> >> >> >> 祝好 >> >> 唐云 >> >> >> >> >> >> >> >> >> >> ________________________________ >> >> From: LakeShen <[hidden email]> >> >> Sent: Friday, January 17, 2020 16:28 >> >> To: [hidden email] <[hidden email]> >> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 >> >> >> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1, >> >> 我看了一下源码,发现当完成的 Checkpoint 数大于 >> state.checkpoints.num-retained的数值时,会对之前的完成的 >> >> Checkpoint 状态做清理。 >> >> >> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为 >> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从 >> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务 >> >> Checkpoint 超时失败。 >> >> >> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 >> state.checkpoints.num-retained又为1,完成的 >> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS >> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗? >> >> >> >> 希望有大佬能帮我解惑,非常感谢 >> >> >> > >> > |
Hi
如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢? chk- 开头的目录,描述的是对应checkpoint独享的state,包括 operator state,rocksDB的非增量文件(例如rocksDB的VERSION等非SST文件)等等,所以只要新的checkpoint完成了,这些目录可以安全删除。 CheckpointCoordinator 有两个方法: restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢 restoreLatestCheckpointedState 主要是给failover用,从checkpoint store获取last completed checkpoint进行恢复。 restoreSavepoint 是给作业启动时候用,从用户传入的Savepoint加载需要的state进行恢复。 祝好 唐云 ________________________________ From: LakeShen <[hidden email]> Sent: Sunday, January 19, 2020 16:48 To: [hidden email] <[hidden email]> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 另外,我的容错恢复是 Flink 自身行为 Checkpoint 的容错恢复,我看 CheckpointCoordinator 有两个方法: restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢? LakeShen <[hidden email]> 于2020年1月19日周日 下午4:30写道: > 非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录: > 1. chk-id 的目录 > 2. shared 目录,其中状态非常大 > 3. taskowned > > 我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到 > chk-id目录。 > 如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录, > 占用的存储不大。下面是相关文件目录的大小: > 1.3 M hdfs:xxx/chk-94794 > 1.1 T hdfs:xxx/shared > 0 hdfs:xxx/taskowned > 如果有什么理解错误,请指出,非常感谢。 > > 祝好, > 沈磊 > > Yun Tang <[hidden email]> 于2020年1月19日周日 下午4:11写道: > >> Hi >> >> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。 >> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。 >> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk- >> 开头的目录。 >> >> 祝好 >> 唐云 >> >> ________________________________ >> From: LakeShen <[hidden email]> >> Sent: Sunday, January 19, 2020 15:42 >> To: [hidden email] <[hidden email]> >> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 >> >> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。 >> >> LakeShen <[hidden email]> 于2020年1月19日周日 下午3:30写道: >> >> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和 >> > Savepoint 类似,如果不清理,就永久保留。 >> > 非常感谢 >> > >> > >> > Yun Tang <[hidden email]> 于2020年1月19日周日 下午2:06写道: >> > >> >> Hi >> >> >> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by >> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 >> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 >> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。 >> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed >> >> >> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。 >> >> >> >> 希望这些解释能解答你的困惑 >> >> >> >> [1] >> >> >> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141 >> >> [2] >> >> >> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214 >> >> >> >> 祝好 >> >> 唐云 >> >> >> >> >> >> >> >> >> >> ________________________________ >> >> From: LakeShen <[hidden email]> >> >> Sent: Friday, January 17, 2020 16:28 >> >> To: [hidden email] <[hidden email]> >> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理 >> >> >> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1, >> >> 我看了一下源码,发现当完成的 Checkpoint 数大于 >> state.checkpoints.num-retained的数值时,会对之前的完成的 >> >> Checkpoint 状态做清理。 >> >> >> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为 >> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从 >> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务 >> >> Checkpoint 超时失败。 >> >> >> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 >> state.checkpoints.num-retained又为1,完成的 >> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS >> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗? >> >> >> >> 希望有大佬能帮我解惑,非常感谢 >> >> >> > >> > |
Free forum by Nabble | Edit this page |