Hi Flink社区,
目前我们在调研checkpoint 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案? 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案? Best, Xingxing Di |
Hi
你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp 的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。 另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm 端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑 checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs 异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2 步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。 另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢? savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比 checkpoint 慢很多 [1] https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91 [2] https://issues.apache.org/jira/browse/FLINK-5763 Best, Congxian [hidden email] <[hidden email]> 于2020年6月11日周四 下午8:21写道: > Hi Flink社区, > 目前我们在调研checkpoint > 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。 > > 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。 > 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案? > 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案? > > > > Best, > Xingxing Di > |
Hi Xingxing
由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1] 来忽略失败的拷贝(例如FileNotFoundException) 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。 [1] https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options 祝好 唐云 ________________________________ From: Congxian Qiu <[hidden email]> Sent: Saturday, June 13, 2020 16:54 To: user-zh <[hidden email]> Subject: Re: 如何做checkpoint的灾备 Hi 你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp 的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。 另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm 端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑 checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs 异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2 步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。 另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢? savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比 checkpoint 慢很多 [1] https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91 [2] https://issues.apache.org/jira/browse/FLINK-5763 Best, Congxian [hidden email] <[hidden email]> 于2020年6月11日周四 下午8:21写道: > Hi Flink社区, > 目前我们在调研checkpoint > 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。 > > 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。 > 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案? > 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案? > > > > Best, > Xingxing Di > |
@Congxian Qiu 感谢你的回复!
1.先回答你的疑问,我们目前checkpoint跨机房容灾的需求比较强烈,是需要上生产的;关于flink 1.11 的savepoint,我们后面可以尝试一下,但是最近几个月还没有升级1.11版本的计划。 2.你说的定期把文件copy到本地集群,然后再复制到远程集群感觉是个可行的方案,我们让hadoop运维同学评估下。 3.checkpoint双写: 感谢分享checkpoint双写的思路,这块我们最近几天会先出个MVP版本验证一下,如果遇到问题肯定还需要请教一下。 异常处理确实需要仔细设计,至少需要保证写备用checkpoint时,不会影响原checkpoint。同时需要引入metric,记录备用checkpoint存储失败的情况 关于选用哪个集群的checkpoint,目前我们是打算手动控制的,会在flink上层提供一个可以批量切换HDFS(Flink集群)的接口 @唐云 感谢你的恢复! 我们试一下你说的方式,感谢。 题外话,我还有个疑问,就是如何判断checkpoint是否可用? 我们基本没有使用savepoint,如果作业挂了需要重新启动时,我们会指定从checkpoint恢复。如果从最新的checkpoint恢复,那很有可能因为checkpoint不完整,导致作业无法启动。 目前我们是简单处理的,优先使用倒数第2个checkpoint,但如果作业checkpoint少于2个,可能需要查找checkpoint路径,并手动指定。 PS:我们用的是flink 1.9.2 Best, Xingxing Di 发件人: Yun Tang 发送时间: 2020-06-14 00:48 收件人: user-zh 主题: Re: 如何做checkpoint的灾备 Hi Xingxing 由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1] 来忽略失败的拷贝(例如FileNotFoundException) 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。 [1] https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options 祝好 唐云 ________________________________ From: Congxian Qiu <[hidden email]> Sent: Saturday, June 13, 2020 16:54 To: user-zh <[hidden email]> Subject: Re: 如何做checkpoint的灾备 Hi 你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp 的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。 另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm 端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑 checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs 异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2 步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。 另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢? savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比 checkpoint 慢很多 [1] https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91 [2] https://issues.apache.org/jira/browse/FLINK-5763 Best, Congxian [hidden email] <[hidden email]> 于2020年6月11日周四 下午8:21写道: > Hi Flink社区, > 目前我们在调研checkpoint > 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。 > > 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。 > 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案? > 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案? > > > > Best, > Xingxing Di > |
正常的流程来说,能找到 checkpoint meta 文件,checkpoint 就是完整的。但是也可能会出现其他的一些异常(主要可能会有
FileNotFound 等异常),那些异常如果需要提前知道的话,可以再 JM 端通过遍历 checkpoint meta 文件来进行判断。 对于希望从 checkpoint 恢复的场景来说,可以考虑下能否在你们的场景中把 checkpoint meta 统一存储到某个地方,这样后续直接从这个地方读取即可。 Best, Congxian [hidden email] <[hidden email]> 于2020年6月15日周一 上午12:06写道: > @Congxian Qiu 感谢你的回复! > 1.先回答你的疑问,我们目前checkpoint跨机房容灾的需求比较强烈,是需要上生产的;关于flink 1.11 > 的savepoint,我们后面可以尝试一下,但是最近几个月还没有升级1.11版本的计划。 > 2.你说的定期把文件copy到本地集群,然后再复制到远程集群感觉是个可行的方案,我们让hadoop运维同学评估下。 > 3.checkpoint双写: > 感谢分享checkpoint双写的思路,这块我们最近几天会先出个MVP版本验证一下,如果遇到问题肯定还需要请教一下。 > > 异常处理确实需要仔细设计,至少需要保证写备用checkpoint时,不会影响原checkpoint。同时需要引入metric,记录备用checkpoint存储失败的情况 > 关于选用哪个集群的checkpoint,目前我们是打算手动控制的,会在flink上层提供一个可以批量切换HDFS(Flink集群)的接口 > @唐云 感谢你的恢复! > 我们试一下你说的方式,感谢。 > > 题外话,我还有个疑问,就是如何判断checkpoint是否可用? > > 我们基本没有使用savepoint,如果作业挂了需要重新启动时,我们会指定从checkpoint恢复。如果从最新的checkpoint恢复,那很有可能因为checkpoint不完整,导致作业无法启动。 > > 目前我们是简单处理的,优先使用倒数第2个checkpoint,但如果作业checkpoint少于2个,可能需要查找checkpoint路径,并手动指定。 > PS:我们用的是flink 1.9.2 > > > > Best, > Xingxing Di > > 发件人: Yun Tang > 发送时间: 2020-06-14 00:48 > 收件人: user-zh > 主题: Re: 如何做checkpoint的灾备 > Hi Xingxing > > 由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1] > 来忽略失败的拷贝(例如FileNotFoundException) > 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。 > > [1] > https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options > > 祝好 > 唐云 > > ________________________________ > From: Congxian Qiu <[hidden email]> > Sent: Saturday, June 13, 2020 16:54 > To: user-zh <[hidden email]> > Subject: Re: 如何做checkpoint的灾备 > > Hi > > 你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp > 的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。 > > 另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm > 端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑 > checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs > 异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2 > 步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。 > > 另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢? > savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比 > checkpoint 慢很多 > > [1] > > https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91 > [2] https://issues.apache.org/jira/browse/FLINK-5763 > > Best, > Congxian > > > [hidden email] <[hidden email]> 于2020年6月11日周四 下午8:21写道: > > > Hi Flink社区, > > 目前我们在调研checkpoint > > 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。 > > > > > 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。 > > 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案? > > 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案? > > > > > > > > Best, > > Xingxing Di > > > |
@Congxian 感谢你的回复,我们会参考你的思路。
Best, Xingxing Di Sender: Congxian Qiu Send Time: 2020-06-15 09:55 Receiver: user-zh cc: zhangyingchen; pengxingbo Subject: Re: Re: 如何做checkpoint的灾备 正常的流程来说,能找到 checkpoint meta 文件,checkpoint 就是完整的。但是也可能会出现其他的一些异常(主要可能会有 FileNotFound 等异常),那些异常如果需要提前知道的话,可以再 JM 端通过遍历 checkpoint meta 文件来进行判断。 对于希望从 checkpoint 恢复的场景来说,可以考虑下能否在你们的场景中把 checkpoint meta 统一存储到某个地方,这样后续直接从这个地方读取即可。 Best, Congxian [hidden email] <[hidden email]> 于2020年6月15日周一 上午12:06写道: > @Congxian Qiu 感谢你的回复! > 1.先回答你的疑问,我们目前checkpoint跨机房容灾的需求比较强烈,是需要上生产的;关于flink 1.11 > 的savepoint,我们后面可以尝试一下,但是最近几个月还没有升级1.11版本的计划。 > 2.你说的定期把文件copy到本地集群,然后再复制到远程集群感觉是个可行的方案,我们让hadoop运维同学评估下。 > 3.checkpoint双写: > 感谢分享checkpoint双写的思路,这块我们最近几天会先出个MVP版本验证一下,如果遇到问题肯定还需要请教一下。 > > 异常处理确实需要仔细设计,至少需要保证写备用checkpoint时,不会影响原checkpoint。同时需要引入metric,记录备用checkpoint存储失败的情况 > 关于选用哪个集群的checkpoint,目前我们是打算手动控制的,会在flink上层提供一个可以批量切换HDFS(Flink集群)的接口 > @唐云 感谢你的恢复! > 我们试一下你说的方式,感谢。 > > 题外话,我还有个疑问,就是如何判断checkpoint是否可用? > > 我们基本没有使用savepoint,如果作业挂了需要重新启动时,我们会指定从checkpoint恢复。如果从最新的checkpoint恢复,那很有可能因为checkpoint不完整,导致作业无法启动。 > > 目前我们是简单处理的,优先使用倒数第2个checkpoint,但如果作业checkpoint少于2个,可能需要查找checkpoint路径,并手动指定。 > PS:我们用的是flink 1.9.2 > > > > Best, > Xingxing Di > > 发件人: Yun Tang > 发送时间: 2020-06-14 00:48 > 收件人: user-zh > 主题: Re: 如何做checkpoint的灾备 > Hi Xingxing > > 由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1] > 来忽略失败的拷贝(例如FileNotFoundException) > 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。 > > [1] > https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options > > 祝好 > 唐云 > > ________________________________ > From: Congxian Qiu <[hidden email]> > Sent: Saturday, June 13, 2020 16:54 > To: user-zh <[hidden email]> > Subject: Re: 如何做checkpoint的灾备 > > Hi > > 你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp > 的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。 > > 另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm > 端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑 > checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs > 异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2 > 步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。 > > 另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢? > savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比 > checkpoint 慢很多 > > [1] > > https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91 > [2] https://issues.apache.org/jira/browse/FLINK-5763 > > Best, > Congxian > > > [hidden email] <[hidden email]> 于2020年6月11日周四 下午8:21写道: > > > Hi Flink社区, > > 目前我们在调研checkpoint > > 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。 > > > > > 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。 > > 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案? > > 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案? > > > > > > > > Best, > > Xingxing Di > > > |
Free forum by Nabble | Edit this page |