回复:流处理任务中checkpoint失败

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

回复:流处理任务中checkpoint失败

Robert.Zhang
Hi Congxian,
测试的时候数据量是很小的,cpu使用比较低的,让我比较奇怪的一点是我杀掉任务重启的话,有时候能成功进行checkpoint,看了下日志,就是这个iteration source成功执行了snapshot,发起了barrier,进而影响到后续operator的checkpoint。失败的时候是该source无法snapshot,直至超时。
因为flink这一块,iteration是由head tail组成,是一个比较特殊的stream task,目前还没有看到jm这边是如何对此进行处理的。这个iteration source其实是由其他source transform而来的,但是在dag图里这是作为一个source operator 存在的,不知道是否对于这个类型的operator taskid,barrier是否有特殊处理

---原始邮件---
发件人: "Congxian Qiu"<[hidden email]&gt;
发送时间: 2020年8月25日(周二) 下午5:33
收件人: "user-zh"<[hidden email]&gt;;
主题: Re: 流处理任务中checkpoint失败


Hi
&nbsp;&nbsp; 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
Best,
Congxian


Robert.Zhang <[hidden email]&gt; 于2020年8月25日周二 上午12:58写道:

&gt; 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
&gt; 官方文档对于在iterative
&gt; stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
&gt; 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
&gt; ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
&gt;
&gt; ---原始邮件---
&gt; 发件人: "Congxian Qiu"<[hidden email]&amp;gt;
&gt; 发送时间: 2020年8月24日(周一) 晚上8:21
&gt; 收件人: "user-zh"<[hidden email]&amp;gt;;
&gt; 主题: Re: 流处理任务中checkpoint失败
&gt;
&gt;
&gt; Hi
&gt; &amp;nbsp;&amp;nbsp; 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的
&gt; checkpoint
&gt; 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
&gt; &amp;nbsp;&amp;nbsp; 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
&gt;
&gt; [1] https://zhuanlan.zhihu.com/p/87131964
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; Robert.Zhang <[hidden email]&amp;gt; 于2020年8月21日周五 下午6:31写道:
&gt;
&gt; &amp;gt; Hello all,
&gt; &amp;gt; 目前遇到一个问题,在iterative stream job
&gt; &amp;gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
&gt; &amp;gt; 测试state 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
&gt; &amp;gt; Exceeded checkpoint tolerable failure threshold.的报错
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 配置如下:
&gt; &amp;gt; env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE, true);
&gt; &amp;gt; CheckpointConfig checkpointConfig = env.getCheckpointConfig();
&gt; &amp;gt; checkpointConfig.setCheckpointTimeout(600000);
&gt; &amp;gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
&gt; &amp;gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
&gt; &amp;gt;
&gt; &amp;gt;
&gt; checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; &amp;gt; checkpointConfig.setPreferCheckpointForRecovery(true);
&gt; &amp;gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
&gt; &amp;gt; checkpointConfig.enableUnalignedCheckpoints();
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?