回复:流处理任务中checkpoint失败

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

回复:流处理任务中checkpoint失败

Robert.Zhang
Hi Congxian,

开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。
比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据,
那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。



为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢?





---原始邮件---
发件人: "Congxian Qiu"<[hidden email]>
发送时间: 2020年8月25日(周二) 下午5:33
收件人: "user-zh"<[hidden email]>;
主题: Re: 流处理任务中checkpoint失败

Hi
   对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
Best,
Congxian


Robert.Zhang <[hidden email]> 于2020年8月25日周二 上午12:58写道:

> 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
> 官方文档对于在iterative
> stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
> 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
> ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
>
> ---原始邮件---
> 发件人: "Congxian Qiu"<[hidden email]&gt;
> 发送时间: 2020年8月24日(周一) 晚上8:21
> 收件人: "user-zh"<[hidden email]&gt;;
> 主题: Re: 流处理任务中checkpoint失败
>
>
> Hi
> &nbsp;&nbsp; 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的
> checkpoint
> 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
> &nbsp;&nbsp; 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
>
> [1] https://zhuanlan.zhihu.com/p/87131964
> Best,
> Congxian
>
>
> Robert.Zhang <[hidden email]&gt; 于2020年8月21日周五 下午6:31写道:
>
> &gt; Hello all,
> &gt; 目前遇到一个问题,在iterative stream job
> &gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
> &gt; 测试state 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
> &gt; Exceeded checkpoint tolerable failure threshold.的报错
> &gt;
> &gt;
> &gt; 配置如下:
> &gt; env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE, true);
> &gt; CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> &gt; checkpointConfig.setCheckpointTimeout(600000);
> &gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
> &gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
> &gt;
> &gt;
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> &gt; checkpointConfig.setPreferCheckpointForRecovery(true);
> &gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
> &gt; checkpointConfig.enableUnalignedCheckpoints();
> &gt;
> &gt;
> &gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?
Reply | Threaded
Open this post in threaded view
|

Re: 流处理任务中checkpoint失败

Congxian Qiu
Hi
   按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到
barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看
Best,
Congxian


Robert.Zhang <[hidden email]> 于2020年8月26日周三 上午11:43写道:

> Hi Congxian,
>
> 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
> 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。
> 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据,
> 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。
>
>
>
> 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢?
>
>
>
>
>
> ---原始邮件---
> *发件人:* "Congxian Qiu"<[hidden email]>
> *发送时间:* 2020年8月25日(周二) 下午5:33
> *收件人:* "user-zh"<[hidden email]>;
> *主题:* Re: 流处理任务中checkpoint失败
>
> Hi
>    对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
> 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
> snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
> Best,
> Congxian
>
>
> Robert.Zhang <[hidden email]> 于2020年8月25日周二 上午12:58写道:
>
> > 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
> > 官方文档对于在iterative
> > stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
> > 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
> > ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
> >
> > ---原始邮件---
> > 发件人: "Congxian Qiu"<[hidden email]&gt;
> > 发送时间: 2020年8月24日(周一) 晚上8:21
> > 收件人: "user-zh"<[hidden email]&gt;;
> > 主题: Re: 流处理任务中checkpoint失败
> >
> >
> > Hi
> > &nbsp;&nbsp; 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的
> > checkpoint
> > 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
> > &nbsp;&nbsp; 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
> >
> > [1] https://zhuanlan.zhihu.com/p/87131964
> > Best,
> > Congxian
> >
> >
> > Robert.Zhang <[hidden email]&gt; 于2020年8月21日周五 下午6:31写道:
> >
> > &gt; Hello all,
> > &gt; 目前遇到一个问题,在iterative stream job
> > &gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
> > &gt; 测试state
> 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
> > &gt; Exceeded checkpoint tolerable failure threshold.的报错
> > &gt;
> > &gt;
> > &gt; 配置如下:
> > &gt; env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE,
> true);
> > &gt; CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > &gt; checkpointConfig.setCheckpointTimeout(600000);
> > &gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
> > &gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
> > &gt;
> > &gt;
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > &gt; checkpointConfig.setPreferCheckpointForRecovery(true);
> > &gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
> > &gt; checkpointConfig.enableUnalignedCheckpoints();
> > &gt;
> > &gt;
> > &gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?
>
Reply | Threaded
Open this post in threaded view
|

回复: 流处理任务中checkpoint失败

Robert.Zhang
Hi


代码大致如下:
DataStream broad=env.readFrom(...).broad;
DataStream firstSource=env.readFrom(...);
DataStream secondSource=env.readFrom(...);


DataStream&nbsp; union=firstSource.union(secondSource);
IterativeStream iterativeStream=union.keyby(...).process(...).iterate();

DataStream&nbsp; result=iterativeStream.closeWith(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; iterativeStream
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyby(...)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .connect(broad)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .process(...));
result.addSink(...);


是否是代码的书写上有问题呢?不胜感激,Thanks all




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月26日(星期三) 晚上7:18
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 流处理任务中checkpoint失败



Hi
&nbsp;&nbsp; 按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到
barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看
Best,
Congxian


Robert.Zhang <[hidden email]&gt; 于2020年8月26日周三 上午11:43写道:

&gt; Hi Congxian,
&gt;
&gt; 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
&gt; 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。
&gt; 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据,
&gt; 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。
&gt;
&gt;
&gt;
&gt; 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢?
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ---原始邮件---
&gt; *发件人:* "Congxian Qiu"<[hidden email]&gt;
&gt; *发送时间:* 2020年8月25日(周二) 下午5:33
&gt; *收件人:* "user-zh"<[hidden email]&gt;;
&gt; *主题:* Re: 流处理任务中checkpoint失败
&gt;
&gt; Hi
&gt;&nbsp;&nbsp;&nbsp; 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
&gt; 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
&gt; snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; Robert.Zhang <[hidden email]&gt; 于2020年8月25日周二 上午12:58写道:
&gt;
&gt; &gt; 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
&gt; &gt; 官方文档对于在iterative
&gt; &gt; stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
&gt; &gt; 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
&gt; &gt; ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
&gt; &gt;
&gt; &gt; ---原始邮件---
&gt; &gt; 发件人: "Congxian Qiu"<[hidden email]&amp;gt;
&gt; &gt; 发送时间: 2020年8月24日(周一) 晚上8:21
&gt; &gt; 收件人: "user-zh"<[hidden email]&amp;gt;;
&gt; &gt; 主题: Re: 流处理任务中checkpoint失败
&gt; &gt;
&gt; &gt;
&gt; &gt; Hi
&gt; &gt; &amp;nbsp;&amp;nbsp; 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的
&gt; &gt; checkpoint
&gt; &gt; 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
&gt; &gt; &amp;nbsp;&amp;nbsp; 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
&gt; &gt;
&gt; &gt; [1] https://zhuanlan.zhihu.com/p/87131964
&gt; &gt; Best,
&gt; &gt; Congxian
&gt; &gt;
&gt; &gt;
&gt; &gt; Robert.Zhang <[hidden email]&amp;gt; 于2020年8月21日周五 下午6:31写道:
&gt; &gt;
&gt; &gt; &amp;gt; Hello all,
&gt; &gt; &amp;gt; 目前遇到一个问题,在iterative stream job
&gt; &gt; &amp;gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
&gt; &gt; &amp;gt; 测试state
&gt; 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
&gt; &gt; &amp;gt; Exceeded checkpoint tolerable failure threshold.的报错
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt; 配置如下:
&gt; &gt; &amp;gt; env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE,
&gt; true);
&gt; &gt; &amp;gt; CheckpointConfig checkpointConfig = env.getCheckpointConfig();
&gt; &gt; &amp;gt; checkpointConfig.setCheckpointTimeout(600000);
&gt; &gt; &amp;gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
&gt; &gt; &amp;gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; &gt; &amp;gt; checkpointConfig.setPreferCheckpointForRecovery(true);
&gt; &gt; &amp;gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
&gt; &gt; &amp;gt; checkpointConfig.enableUnalignedCheckpoints();
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?
&gt;
Reply | Threaded
Open this post in threaded view
|

Re: 回复: 流处理任务中checkpoint失败

Yun Tang
Hi Robert

你的两个source firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint barrier并没有下发。
建议使用jstack查看在没有数据下发时,source相关task的java调用栈,观察是否存在等待锁释放

[1] https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916
[2] https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92

祝好
唐云
________________________________
From: Robert.Zhang <[hidden email]>
Sent: Wednesday, August 26, 2020 22:17
To: user-zh <[hidden email]>
Subject: 回复: 流处理任务中checkpoint失败

Hi


代码大致如下:
DataStream broad=env.readFrom(...).broad;
DataStream firstSource=env.readFrom(...);
DataStream secondSource=env.readFrom(...);


DataStream&nbsp; union=firstSource.union(secondSource);
IterativeStream iterativeStream=union.keyby(...).process(...).iterate();

DataStream&nbsp; result=iterativeStream.closeWith(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; iterativeStream
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyby(...)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .connect(broad)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .process(...));
result.addSink(...);


是否是代码的书写上有问题呢?不胜感激,Thanks all




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月26日(星期三) 晚上7:18
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 流处理任务中checkpoint失败



Hi
&nbsp;&nbsp; 按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到
barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看
Best,
Congxian


Robert.Zhang <[hidden email]&gt; 于2020年8月26日周三 上午11:43写道:

&gt; Hi Congxian,
&gt;
&gt; 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
&gt; 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。
&gt; 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据,
&gt; 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。
&gt;
&gt;
&gt;
&gt; 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢?
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ---原始邮件---
&gt; *发件人:* "Congxian Qiu"<[hidden email]&gt;
&gt; *发送时间:* 2020年8月25日(周二) 下午5:33
&gt; *收件人:* "user-zh"<[hidden email]&gt;;
&gt; *主题:* Re: 流处理任务中checkpoint失败
&gt;
&gt; Hi
&gt;&nbsp;&nbsp;&nbsp; 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
&gt; 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
&gt; snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; Robert.Zhang <[hidden email]&gt; 于2020年8月25日周二 上午12:58写道:
&gt;
&gt; &gt; 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
&gt; &gt; 官方文档对于在iterative
&gt; &gt; stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
&gt; &gt; 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
&gt; &gt; ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
&gt; &gt;
&gt; &gt; ---原始邮件---
&gt; &gt; 发件人: "Congxian Qiu"<[hidden email]&amp;gt;
&gt; &gt; 发送时间: 2020年8月24日(周一) 晚上8:21
&gt; &gt; 收件人: "user-zh"<[hidden email]&amp;gt;;
&gt; &gt; 主题: Re: 流处理任务中checkpoint失败
&gt; &gt;
&gt; &gt;
&gt; &gt; Hi
&gt; &gt; &amp;nbsp;&amp;nbsp; 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的
&gt; &gt; checkpoint
&gt; &gt; 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
&gt; &gt; &amp;nbsp;&amp;nbsp; 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
&gt; &gt;
&gt; &gt; [1] https://zhuanlan.zhihu.com/p/87131964
&gt; &gt; Best,
&gt; &gt; Congxian
&gt; &gt;
&gt; &gt;
&gt; &gt; Robert.Zhang <[hidden email]&amp;gt; 于2020年8月21日周五 下午6:31写道:
&gt; &gt;
&gt; &gt; &amp;gt; Hello all,
&gt; &gt; &amp;gt; 目前遇到一个问题,在iterative stream job
&gt; &gt; &amp;gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
&gt; &gt; &amp;gt; 测试state
&gt; 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
&gt; &gt; &amp;gt; Exceeded checkpoint tolerable failure threshold.的报错
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt; 配置如下:
&gt; &gt; &amp;gt; env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE,
&gt; true);
&gt; &gt; &amp;gt; CheckpointConfig checkpointConfig = env.getCheckpointConfig();
&gt; &gt; &amp;gt; checkpointConfig.setCheckpointTimeout(600000);
&gt; &gt; &amp;gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
&gt; &gt; &amp;gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; &gt; &amp;gt; checkpointConfig.setPreferCheckpointForRecovery(true);
&gt; &gt; &amp;gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
&gt; &gt; &amp;gt; checkpointConfig.enableUnalignedCheckpoints();
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?
&gt;
Reply | Threaded
Open this post in threaded view
|

Re: 回复: 流处理任务中checkpoint失败

Congxian Qiu
Hi
   从代码暂时没有看出问题,不确定 迭代 作业的 checkpoint 是否有特殊的地方,我抄送了一个对迭代这块更了解的人(Yun
Gao),或许他在这块有一些建议

Best,
Congxian


Yun Tang <[hidden email]> 于2020年8月27日周四 下午5:10写道:

> Hi Robert
>
> 你的两个source
> firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint
> barrier并没有下发。
> 建议使用jstack查看在没有数据下发时,source相关task的java调用栈,观察是否存在等待锁释放
>
> [1]
> https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916
> [2]
> https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92
>
> 祝好
> 唐云
> ________________________________
> From: Robert.Zhang <[hidden email]>
> Sent: Wednesday, August 26, 2020 22:17
> To: user-zh <[hidden email]>
> Subject: 回复: 流处理任务中checkpoint失败
>
> Hi
>
>
> 代码大致如下:
> DataStream broad=env.readFrom(...).broad;
> DataStream firstSource=env.readFrom(...);
> DataStream secondSource=env.readFrom(...);
>
>
> DataStream&nbsp; union=firstSource.union(secondSource);
> IterativeStream iterativeStream=union.keyby(...).process(...).iterate();
>
> DataStream&nbsp; result=iterativeStream.closeWith(
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; iterativeStream
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; .keyby(...)
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; .connect(broad)
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; .process(...));
> result.addSink(...);
>
>
> 是否是代码的书写上有问题呢?不胜感激,Thanks all
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> [hidden email]&gt;;
> 发送时间:&nbsp;2020年8月26日(星期三) 晚上7:18
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 流处理任务中checkpoint失败
>
>
>
> Hi
> &nbsp;&nbsp; 按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到
> barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看
> Best,
> Congxian
>
>
> Robert.Zhang <[hidden email]&gt; 于2020年8月26日周三 上午11:43写道:
>
> &gt; Hi Congxian,
> &gt;
> &gt; 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
> &gt; 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。
> &gt; 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据,
> &gt; 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。
> &gt;
> &gt;
> &gt;
> &gt; 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢?
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ---原始邮件---
> &gt; *发件人:* "Congxian Qiu"<[hidden email]&gt;
> &gt; *发送时间:* 2020年8月25日(周二) 下午5:33
> &gt; *收件人:* "user-zh"<[hidden email]&gt;;
> &gt; *主题:* Re: 流处理任务中checkpoint失败
> &gt;
> &gt; Hi
> &gt;&nbsp;&nbsp;&nbsp; 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source
> 没有完成的话,或许看一下相应并发(没完成 snapshot
> &gt; 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
> &gt; snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
> &gt; Best,
> &gt; Congxian
> &gt;
> &gt;
> &gt; Robert.Zhang <[hidden email]&gt; 于2020年8月25日周二 上午12:58写道:
> &gt;
> &gt; &gt; 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration
> source的checkpoint始终无法完成。
> &gt; &gt; 官方文档对于在iterative
> &gt; &gt;
> stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
> &gt; &gt; 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
> &gt; &gt; ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
> &gt; &gt;
> &gt; &gt; ---原始邮件---
> &gt; &gt; 发件人: "Congxian Qiu"<[hidden email]&amp;gt;
> &gt; &gt; 发送时间: 2020年8月24日(周一) 晚上8:21
> &gt; &gt; 收件人: "user-zh"<[hidden email]&amp;gt;;
> &gt; &gt; 主题: Re: 流处理任务中checkpoint失败
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Hi
> &gt; &gt; &amp;nbsp;&amp;nbsp; 从报错 ”Exceeded checkpoint tolerable failure
> threshold“ 看,你的
> &gt; &gt; checkpoint
> &gt; &gt; 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
> &gt; &gt; &amp;nbsp;&amp;nbsp; 另外从配置看,你开启了 unalign
> checkpoint,这个是上述文章中暂时没有设计的地方。
> &gt; &gt;
> &gt; &gt; [1] https://zhuanlan.zhihu.com/p/87131964
> &gt; &gt; Best,
> &gt; &gt; Congxian
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Robert.Zhang <[hidden email]&amp;gt; 于2020年8月21日周五 下午6:31写道:
> &gt; &gt;
> &gt; &gt; &amp;gt; Hello all,
> &gt; &gt; &amp;gt; 目前遇到一个问题,在iterative stream job
> &gt; &gt; &amp;gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
> &gt; &gt; &amp;gt; 测试state
> &gt; 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
> &gt; &gt; &amp;gt; Exceeded checkpoint tolerable failure threshold.的报错
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt; 配置如下:
> &gt; &gt; &amp;gt; env.enableCheckpointing(10000,
> CheckpointingMode.EXACTLY_ONCE,
> &gt; true);
> &gt; &gt; &amp;gt; CheckpointConfig checkpointConfig =
> env.getCheckpointConfig();
> &gt; &gt; &amp;gt; checkpointConfig.setCheckpointTimeout(600000);
> &gt; &gt; &amp;gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
> &gt; &gt; &amp;gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> &gt; &gt; &amp;gt; checkpointConfig.setPreferCheckpointForRecovery(true);
> &gt; &gt; &amp;gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
> &gt; &gt; &amp;gt; checkpointConfig.enableUnalignedCheckpoints();
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?
> &gt;
>
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 流处理任务中checkpoint失败

Robert.Zhang
Hi Yun and CongXian


Source是使用两个kafkaconsumer 分别做了个map,没有单独进行实现


看了下代码,iteration head这边processinput这个方法进行了overwrite,
内部使用了一个队列来进行消息传递,
stream task这一块是使用了个mailbox的方式来做的,
暂时我还没有深入理解这一块,
但是感觉可能是overwrite之后跟普通的stream task处理有区别






------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月27日(星期四) 晚上9:20
收件人:&nbsp;"user-zh"<[hidden email]&gt;;
抄送:&nbsp;"Yun Gao"<[hidden email]&gt;;
主题:&nbsp;Re: 回复: 流处理任务中checkpoint失败



Hi
&nbsp;&nbsp; 从代码暂时没有看出问题,不确定 迭代 作业的 checkpoint 是否有特殊的地方,我抄送了一个对迭代这块更了解的人(Yun
Gao),或许他在这块有一些建议

Best,
Congxian


Yun Tang <[hidden email]&gt; 于2020年8月27日周四 下午5:10写道:

&gt; Hi Robert
&gt;
&gt; 你的两个source
&gt; firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint
&gt; barrier并没有下发。
&gt; 建议使用jstack查看在没有数据下发时,source相关task的java调用栈,观察是否存在等待锁释放
&gt;
&gt; [1]
&gt; https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916
&gt; [2]
&gt; https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92
&gt;
&gt; 祝好
&gt; 唐云
&gt; ________________________________
&gt; From: Robert.Zhang <[hidden email]&gt;
&gt; Sent: Wednesday, August 26, 2020 22:17
&gt; To: user-zh <[hidden email]&gt;
&gt; Subject: 回复: 流处理任务中checkpoint失败
&gt;
&gt; Hi
&gt;
&gt;
&gt; 代码大致如下:
&gt; DataStream broad=env.readFrom(...).broad;
&gt; DataStream firstSource=env.readFrom(...);
&gt; DataStream secondSource=env.readFrom(...);
&gt;
&gt;
&gt; DataStream&amp;nbsp; union=firstSource.union(secondSource);
&gt; IterativeStream iterativeStream=union.keyby(...).process(...).iterate();
&gt;
&gt; DataStream&amp;nbsp; result=iterativeStream.closeWith(
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; iterativeStream
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; .keyby(...)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; .connect(broad)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; .process(...));
&gt; result.addSink(...);
&gt;
&gt;
&gt; 是否是代码的书写上有问题呢?不胜感激,Thanks all
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <
&gt; [hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年8月26日(星期三) 晚上7:18
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 流处理任务中checkpoint失败
&gt;
&gt;
&gt;
&gt; Hi
&gt; &amp;nbsp;&amp;nbsp; 按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到
&gt; barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; Robert.Zhang <[hidden email]&amp;gt; 于2020年8月26日周三 上午11:43写道:
&gt;
&gt; &amp;gt; Hi Congxian,
&gt; &amp;gt;
&gt; &amp;gt; 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
&gt; &amp;gt; 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。
&gt; &amp;gt; 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据,
&gt; &amp;gt; 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢?
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ---原始邮件---
&gt; &amp;gt; *发件人:* "Congxian Qiu"<[hidden email]&amp;gt;
&gt; &amp;gt; *发送时间:* 2020年8月25日(周二) 下午5:33
&gt; &amp;gt; *收件人:* "user-zh"<[hidden email]&amp;gt;;
&gt; &amp;gt; *主题:* Re: 流处理任务中checkpoint失败
&gt; &amp;gt;
&gt; &amp;gt; Hi
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source
&gt; 没有完成的话,或许看一下相应并发(没完成 snapshot
&gt; &amp;gt; 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
&gt; &amp;gt; snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
&gt; &amp;gt; Best,
&gt; &amp;gt; Congxian
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; Robert.Zhang <[hidden email]&amp;gt; 于2020年8月25日周二 上午12:58写道:
&gt; &amp;gt;
&gt; &amp;gt; &amp;gt; 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration
&gt; source的checkpoint始终无法完成。
&gt; &amp;gt; &amp;gt; 官方文档对于在iterative
&gt; &amp;gt; &amp;gt;
&gt; stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
&gt; &amp;gt; &amp;gt; 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
&gt; &amp;gt; &amp;gt; ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; ---原始邮件---
&gt; &amp;gt; &amp;gt; 发件人: "Congxian Qiu"<[hidden email]&amp;amp;gt;
&gt; &amp;gt; &amp;gt; 发送时间: 2020年8月24日(周一) 晚上8:21
&gt; &amp;gt; &amp;gt; 收件人: "user-zh"<[hidden email]&amp;amp;gt;;
&gt; &amp;gt; &amp;gt; 主题: Re: 流处理任务中checkpoint失败
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Hi
&gt; &amp;gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; 从报错 ”Exceeded checkpoint tolerable failure
&gt; threshold“ 看,你的
&gt; &amp;gt; &amp;gt; checkpoint
&gt; &amp;gt; &amp;gt; 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
&gt; &amp;gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; 另外从配置看,你开启了 unalign
&gt; checkpoint,这个是上述文章中暂时没有设计的地方。
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; [1] https://zhuanlan.zhihu.com/p/87131964
&gt; &amp;gt; &amp;gt; Best,
&gt; &amp;gt; &amp;gt; Congxian
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Robert.Zhang <[hidden email]&amp;amp;gt; 于2020年8月21日周五 下午6:31写道:
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt; Hello all,
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 目前遇到一个问题,在iterative stream job
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 测试state
&gt; &amp;gt; 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
&gt; &amp;gt; &amp;gt; &amp;amp;gt; Exceeded checkpoint tolerable failure threshold.的报错
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 配置如下:
&gt; &amp;gt; &amp;gt; &amp;amp;gt; env.enableCheckpointing(10000,
&gt; CheckpointingMode.EXACTLY_ONCE,
&gt; &amp;gt; true);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; CheckpointConfig checkpointConfig =
&gt; env.getCheckpointConfig();
&gt; &amp;gt; &amp;gt; &amp;amp;gt; checkpointConfig.setCheckpointTimeout(600000);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt;
&gt; checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; checkpointConfig.setPreferCheckpointForRecovery(true);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; checkpointConfig.enableUnalignedCheckpoints();
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?
&gt; &amp;gt;
&gt;