Hi Congxian, 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据, 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢? ---原始邮件--- 发件人: "Congxian Qiu"<[hidden email]> 发送时间: 2020年8月25日(周二) 下午5:33 收件人: "user-zh"<[hidden email]>; 主题: Re: 流处理任务中checkpoint失败 Hi 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的 snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。 Best, Congxian Robert.Zhang <[hidden email]> 于2020年8月25日周二 上午12:58写道: > 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。 > 官方文档对于在iterative > stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。 > 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游 > ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的 > > ---原始邮件--- > 发件人: "Congxian Qiu"<[hidden email]> > 发送时间: 2020年8月24日(周一) 晚上8:21 > 收件人: "user-zh"<[hidden email]>; > 主题: Re: 流处理任务中checkpoint失败 > > > Hi > 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的 > checkpoint > 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助 > 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。 > > [1] https://zhuanlan.zhihu.com/p/87131964 > Best, > Congxian > > > Robert.Zhang <[hidden email]> 于2020年8月21日周五 下午6:31写道: > > > Hello all, > > 目前遇到一个问题,在iterative stream job > > 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功 > > 测试state 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException: > > Exceeded checkpoint tolerable failure threshold.的报错 > > > > > > 配置如下: > > env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE, true); > > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > > checkpointConfig.setCheckpointTimeout(600000); > > checkpointConfig.setMinPauseBetweenCheckpoints(60000); > > checkpointConfig.setMaxConcurrentCheckpoints(4); > > > > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > checkpointConfig.setPreferCheckpointForRecovery(true); > > checkpointConfig.setTolerableCheckpointFailureNumber(2); > > checkpointConfig.enableUnalignedCheckpoints(); > > > > > > 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗? |
Hi
按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到 barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看 Best, Congxian Robert.Zhang <[hidden email]> 于2020年8月26日周三 上午11:43写道: > Hi Congxian, > > 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。 > 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。 > 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据, > 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。 > > > > 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢? > > > > > > ---原始邮件--- > *发件人:* "Congxian Qiu"<[hidden email]> > *发送时间:* 2020年8月25日(周二) 下午5:33 > *收件人:* "user-zh"<[hidden email]>; > *主题:* Re: 流处理任务中checkpoint失败 > > Hi > 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot > 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的 > snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。 > Best, > Congxian > > > Robert.Zhang <[hidden email]> 于2020年8月25日周二 上午12:58写道: > > > 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。 > > 官方文档对于在iterative > > stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。 > > 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游 > > ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的 > > > > ---原始邮件--- > > 发件人: "Congxian Qiu"<[hidden email]> > > 发送时间: 2020年8月24日(周一) 晚上8:21 > > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 流处理任务中checkpoint失败 > > > > > > Hi > > 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的 > > checkpoint > > 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助 > > 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。 > > > > [1] https://zhuanlan.zhihu.com/p/87131964 > > Best, > > Congxian > > > > > > Robert.Zhang <[hidden email]> 于2020年8月21日周五 下午6:31写道: > > > > > Hello all, > > > 目前遇到一个问题,在iterative stream job > > > 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功 > > > 测试state > 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException: > > > Exceeded checkpoint tolerable failure threshold.的报错 > > > > > > > > > 配置如下: > > > env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE, > true); > > > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > > > checkpointConfig.setCheckpointTimeout(600000); > > > checkpointConfig.setMinPauseBetweenCheckpoints(60000); > > > checkpointConfig.setMaxConcurrentCheckpoints(4); > > > > > > > > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > checkpointConfig.setPreferCheckpointForRecovery(true); > > > checkpointConfig.setTolerableCheckpointFailureNumber(2); > > > checkpointConfig.enableUnalignedCheckpoints(); > > > > > > > > > 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗? > |
Hi
代码大致如下: DataStream broad=env.readFrom(...).broad; DataStream firstSource=env.readFrom(...); DataStream secondSource=env.readFrom(...); DataStream union=firstSource.union(secondSource); IterativeStream iterativeStream=union.keyby(...).process(...).iterate(); DataStream result=iterativeStream.closeWith( iterativeStream .keyby(...) .connect(broad) .process(...)); result.addSink(...); 是否是代码的书写上有问题呢?不胜感激,Thanks all ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月26日(星期三) 晚上7:18 收件人: "user-zh"<[hidden email]>; 主题: Re: 流处理任务中checkpoint失败 Hi 按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到 barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看 Best, Congxian Robert.Zhang <[hidden email]> 于2020年8月26日周三 上午11:43写道: > Hi Congxian, > > 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。 > 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。 > 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据, > 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。 > > > > 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢? > > > > > > ---原始邮件--- > *发件人:* "Congxian Qiu"<[hidden email]> > *发送时间:* 2020年8月25日(周二) 下午5:33 > *收件人:* "user-zh"<[hidden email]>; > *主题:* Re: 流处理任务中checkpoint失败 > > Hi > 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot > 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的 > snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。 > Best, > Congxian > > > Robert.Zhang <[hidden email]> 于2020年8月25日周二 上午12:58写道: > > > 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。 > > 官方文档对于在iterative > > stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。 > > 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游 > > ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的 > > > > ---原始邮件--- > > 发件人: "Congxian Qiu"<[hidden email]&gt; > > 发送时间: 2020年8月24日(周一) 晚上8:21 > > 收件人: "user-zh"<[hidden email]&gt;; > > 主题: Re: 流处理任务中checkpoint失败 > > > > > > Hi > > &nbsp;&nbsp; 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的 > > checkpoint > > 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助 > > &nbsp;&nbsp; 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。 > > > > [1] https://zhuanlan.zhihu.com/p/87131964 > > Best, > > Congxian > > > > > > Robert.Zhang <[hidden email]&gt; 于2020年8月21日周五 下午6:31写道: > > > > &gt; Hello all, > > &gt; 目前遇到一个问题,在iterative stream job > > &gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功 > > &gt; 测试state > 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException: > > &gt; Exceeded checkpoint tolerable failure threshold.的报错 > > &gt; > > &gt; > > &gt; 配置如下: > > &gt; env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE, > true); > > &gt; CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > > &gt; checkpointConfig.setCheckpointTimeout(600000); > > &gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000); > > &gt; checkpointConfig.setMaxConcurrentCheckpoints(4); > > &gt; > > &gt; > > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > &gt; checkpointConfig.setPreferCheckpointForRecovery(true); > > &gt; checkpointConfig.setTolerableCheckpointFailureNumber(2); > > &gt; checkpointConfig.enableUnalignedCheckpoints(); > > &gt; > > &gt; > > &gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗? > |
Hi Robert
你的两个source firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint barrier并没有下发。 建议使用jstack查看在没有数据下发时,source相关task的java调用栈,观察是否存在等待锁释放 [1] https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916 [2] https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92 祝好 唐云 ________________________________ From: Robert.Zhang <[hidden email]> Sent: Wednesday, August 26, 2020 22:17 To: user-zh <[hidden email]> Subject: 回复: 流处理任务中checkpoint失败 Hi 代码大致如下: DataStream broad=env.readFrom(...).broad; DataStream firstSource=env.readFrom(...); DataStream secondSource=env.readFrom(...); DataStream union=firstSource.union(secondSource); IterativeStream iterativeStream=union.keyby(...).process(...).iterate(); DataStream result=iterativeStream.closeWith( iterativeStream .keyby(...) .connect(broad) .process(...)); result.addSink(...); 是否是代码的书写上有问题呢?不胜感激,Thanks all ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月26日(星期三) 晚上7:18 收件人: "user-zh"<[hidden email]>; 主题: Re: 流处理任务中checkpoint失败 Hi 按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到 barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看 Best, Congxian Robert.Zhang <[hidden email]> 于2020年8月26日周三 上午11:43写道: > Hi Congxian, > > 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。 > 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。 > 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据, > 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。 > > > > 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢? > > > > > > ---原始邮件--- > *发件人:* "Congxian Qiu"<[hidden email]> > *发送时间:* 2020年8月25日(周二) 下午5:33 > *收件人:* "user-zh"<[hidden email]>; > *主题:* Re: 流处理任务中checkpoint失败 > > Hi > 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot > 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的 > snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。 > Best, > Congxian > > > Robert.Zhang <[hidden email]> 于2020年8月25日周二 上午12:58写道: > > > 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。 > > 官方文档对于在iterative > > stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。 > > 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游 > > ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的 > > > > ---原始邮件--- > > 发件人: "Congxian Qiu"<[hidden email]&gt; > > 发送时间: 2020年8月24日(周一) 晚上8:21 > > 收件人: "user-zh"<[hidden email]&gt;; > > 主题: Re: 流处理任务中checkpoint失败 > > > > > > Hi > > &nbsp;&nbsp; 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的 > > checkpoint > > 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助 > > &nbsp;&nbsp; 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。 > > > > [1] https://zhuanlan.zhihu.com/p/87131964 > > Best, > > Congxian > > > > > > Robert.Zhang <[hidden email]&gt; 于2020年8月21日周五 下午6:31写道: > > > > &gt; Hello all, > > &gt; 目前遇到一个问题,在iterative stream job > > &gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功 > > &gt; 测试state > 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException: > > &gt; Exceeded checkpoint tolerable failure threshold.的报错 > > &gt; > > &gt; > > &gt; 配置如下: > > &gt; env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE, > true); > > &gt; CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > > &gt; checkpointConfig.setCheckpointTimeout(600000); > > &gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000); > > &gt; checkpointConfig.setMaxConcurrentCheckpoints(4); > > &gt; > > &gt; > > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > &gt; checkpointConfig.setPreferCheckpointForRecovery(true); > > &gt; checkpointConfig.setTolerableCheckpointFailureNumber(2); > > &gt; checkpointConfig.enableUnalignedCheckpoints(); > > &gt; > > &gt; > > &gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗? > |
Hi
从代码暂时没有看出问题,不确定 迭代 作业的 checkpoint 是否有特殊的地方,我抄送了一个对迭代这块更了解的人(Yun Gao),或许他在这块有一些建议 Best, Congxian Yun Tang <[hidden email]> 于2020年8月27日周四 下午5:10写道: > Hi Robert > > 你的两个source > firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint > barrier并没有下发。 > 建议使用jstack查看在没有数据下发时,source相关task的java调用栈,观察是否存在等待锁释放 > > [1] > https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916 > [2] > https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92 > > 祝好 > 唐云 > ________________________________ > From: Robert.Zhang <[hidden email]> > Sent: Wednesday, August 26, 2020 22:17 > To: user-zh <[hidden email]> > Subject: 回复: 流处理任务中checkpoint失败 > > Hi > > > 代码大致如下: > DataStream broad=env.readFrom(...).broad; > DataStream firstSource=env.readFrom(...); > DataStream secondSource=env.readFrom(...); > > > DataStream union=firstSource.union(secondSource); > IterativeStream iterativeStream=union.keyby(...).process(...).iterate(); > > DataStream result=iterativeStream.closeWith( > > > > iterativeStream > > > > .keyby(...) > > > > .connect(broad) > > > > .process(...)); > result.addSink(...); > > > 是否是代码的书写上有问题呢?不胜感激,Thanks all > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [hidden email]>; > 发送时间: 2020年8月26日(星期三) 晚上7:18 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 流处理任务中checkpoint失败 > > > > Hi > 按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到 > barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看 > Best, > Congxian > > > Robert.Zhang <[hidden email]> 于2020年8月26日周三 上午11:43写道: > > > Hi Congxian, > > > > 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。 > > 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。 > > 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据, > > 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。 > > > > > > > > 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢? > > > > > > > > > > > > ---原始邮件--- > > *发件人:* "Congxian Qiu"<[hidden email]> > > *发送时间:* 2020年8月25日(周二) 下午5:33 > > *收件人:* "user-zh"<[hidden email]>; > > *主题:* Re: 流处理任务中checkpoint失败 > > > > Hi > > 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source > 没有完成的话,或许看一下相应并发(没完成 snapshot > > 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的 > > snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。 > > Best, > > Congxian > > > > > > Robert.Zhang <[hidden email]> 于2020年8月25日周二 上午12:58写道: > > > > > 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration > source的checkpoint始终无法完成。 > > > 官方文档对于在iterative > > > > stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。 > > > 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游 > > > ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的 > > > > > > ---原始邮件--- > > > 发件人: "Congxian Qiu"<[hidden email]&gt; > > > 发送时间: 2020年8月24日(周一) 晚上8:21 > > > 收件人: "user-zh"<[hidden email]&gt;; > > > 主题: Re: 流处理任务中checkpoint失败 > > > > > > > > > Hi > > > &nbsp;&nbsp; 从报错 ”Exceeded checkpoint tolerable failure > threshold“ 看,你的 > > > checkpoint > > > 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助 > > > &nbsp;&nbsp; 另外从配置看,你开启了 unalign > checkpoint,这个是上述文章中暂时没有设计的地方。 > > > > > > [1] https://zhuanlan.zhihu.com/p/87131964 > > > Best, > > > Congxian > > > > > > > > > Robert.Zhang <[hidden email]&gt; 于2020年8月21日周五 下午6:31写道: > > > > > > &gt; Hello all, > > > &gt; 目前遇到一个问题,在iterative stream job > > > &gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功 > > > &gt; 测试state > > 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException: > > > &gt; Exceeded checkpoint tolerable failure threshold.的报错 > > > &gt; > > > &gt; > > > &gt; 配置如下: > > > &gt; env.enableCheckpointing(10000, > CheckpointingMode.EXACTLY_ONCE, > > true); > > > &gt; CheckpointConfig checkpointConfig = > env.getCheckpointConfig(); > > > &gt; checkpointConfig.setCheckpointTimeout(600000); > > > &gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000); > > > &gt; checkpointConfig.setMaxConcurrentCheckpoints(4); > > > &gt; > > > &gt; > > > > > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > &gt; checkpointConfig.setPreferCheckpointForRecovery(true); > > > &gt; checkpointConfig.setTolerableCheckpointFailureNumber(2); > > > &gt; checkpointConfig.enableUnalignedCheckpoints(); > > > &gt; > > > &gt; > > > &gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗? > > > |
Hi Yun and CongXian
Source是使用两个kafkaconsumer 分别做了个map,没有单独进行实现 看了下代码,iteration head这边processinput这个方法进行了overwrite, 内部使用了一个队列来进行消息传递, stream task这一块是使用了个mailbox的方式来做的, 暂时我还没有深入理解这一块, 但是感觉可能是overwrite之后跟普通的stream task处理有区别 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月27日(星期四) 晚上9:20 收件人: "user-zh"<[hidden email]>; 抄送: "Yun Gao"<[hidden email]>; 主题: Re: 回复: 流处理任务中checkpoint失败 Hi 从代码暂时没有看出问题,不确定 迭代 作业的 checkpoint 是否有特殊的地方,我抄送了一个对迭代这块更了解的人(Yun Gao),或许他在这块有一些建议 Best, Congxian Yun Tang <[hidden email]> 于2020年8月27日周四 下午5:10写道: > Hi Robert > > 你的两个source > firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint > barrier并没有下发。 > 建议使用jstack查看在没有数据下发时,source相关task的java调用栈,观察是否存在等待锁释放 > > [1] > https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916 > [2] > https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92 > > 祝好 > 唐云 > ________________________________ > From: Robert.Zhang <[hidden email]> > Sent: Wednesday, August 26, 2020 22:17 > To: user-zh <[hidden email]> > Subject: 回复: 流处理任务中checkpoint失败 > > Hi > > > 代码大致如下: > DataStream broad=env.readFrom(...).broad; > DataStream firstSource=env.readFrom(...); > DataStream secondSource=env.readFrom(...); > > > DataStream&nbsp; union=firstSource.union(secondSource); > IterativeStream iterativeStream=union.keyby(...).process(...).iterate(); > > DataStream&nbsp; result=iterativeStream.closeWith( > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; iterativeStream > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; .keyby(...) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; .connect(broad) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; .process(...)); > result.addSink(...); > > > 是否是代码的书写上有问题呢?不胜感激,Thanks all > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人: > "user-zh" > < > [hidden email]&gt;; > 发送时间:&nbsp;2020年8月26日(星期三) 晚上7:18 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 流处理任务中checkpoint失败 > > > > Hi > &nbsp;&nbsp; 按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到 > barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看 > Best, > Congxian > > > Robert.Zhang <[hidden email]&gt; 于2020年8月26日周三 上午11:43写道: > > &gt; Hi Congxian, > &gt; > &gt; 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。 > &gt; 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。 > &gt; 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据, > &gt; 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。 > &gt; > &gt; > &gt; > &gt; 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢? > &gt; > &gt; > &gt; > &gt; > &gt; > &gt; ---原始邮件--- > &gt; *发件人:* "Congxian Qiu"<[hidden email]&gt; > &gt; *发送时间:* 2020年8月25日(周二) 下午5:33 > &gt; *收件人:* "user-zh"<[hidden email]&gt;; > &gt; *主题:* Re: 流处理任务中checkpoint失败 > &gt; > &gt; Hi > &gt;&nbsp;&nbsp;&nbsp; 对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source > 没有完成的话,或许看一下相应并发(没完成 snapshot > &gt; 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的 > &gt; snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。 > &gt; Best, > &gt; Congxian > &gt; > &gt; > &gt; Robert.Zhang <[hidden email]&gt; 于2020年8月25日周二 上午12:58写道: > &gt; > &gt; &gt; 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration > source的checkpoint始终无法完成。 > &gt; &gt; 官方文档对于在iterative > &gt; &gt; > stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。 > &gt; &gt; 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游 > &gt; &gt; ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的 > &gt; &gt; > &gt; &gt; ---原始邮件--- > &gt; &gt; 发件人: "Congxian Qiu"<[hidden email]&amp;gt; > &gt; &gt; 发送时间: 2020年8月24日(周一) 晚上8:21 > &gt; &gt; 收件人: "user-zh"<[hidden email]&amp;gt;; > &gt; &gt; 主题: Re: 流处理任务中checkpoint失败 > &gt; &gt; > &gt; &gt; > &gt; &gt; Hi > &gt; &gt; &amp;nbsp;&amp;nbsp; 从报错 ”Exceeded checkpoint tolerable failure > threshold“ 看,你的 > &gt; &gt; checkpoint > &gt; &gt; 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助 > &gt; &gt; &amp;nbsp;&amp;nbsp; 另外从配置看,你开启了 unalign > checkpoint,这个是上述文章中暂时没有设计的地方。 > &gt; &gt; > &gt; &gt; [1] https://zhuanlan.zhihu.com/p/87131964 > &gt; &gt; Best, > &gt; &gt; Congxian > &gt; &gt; > &gt; &gt; > &gt; &gt; Robert.Zhang <[hidden email]&amp;gt; 于2020年8月21日周五 下午6:31写道: > &gt; &gt; > &gt; &gt; &amp;gt; Hello all, > &gt; &gt; &amp;gt; 目前遇到一个问题,在iterative stream job > &gt; &gt; &amp;gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功 > &gt; &gt; &amp;gt; 测试state > &gt; 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException: > &gt; &gt; &amp;gt; Exceeded checkpoint tolerable failure threshold.的报错 > &gt; &gt; &amp;gt; > &gt; &gt; &amp;gt; > &gt; &gt; &amp;gt; 配置如下: > &gt; &gt; &amp;gt; env.enableCheckpointing(10000, > CheckpointingMode.EXACTLY_ONCE, > &gt; true); > &gt; &gt; &amp;gt; CheckpointConfig checkpointConfig = > env.getCheckpointConfig(); > &gt; &gt; &amp;gt; checkpointConfig.setCheckpointTimeout(600000); > &gt; &gt; &amp;gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000); > &gt; &gt; &amp;gt; checkpointConfig.setMaxConcurrentCheckpoints(4); > &gt; &gt; &amp;gt; > &gt; &gt; &amp;gt; > &gt; &gt; > &gt; > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > &gt; &gt; &amp;gt; checkpointConfig.setPreferCheckpointForRecovery(true); > &gt; &gt; &amp;gt; checkpointConfig.setTolerableCheckpointFailureNumber(2); > &gt; &gt; &amp;gt; checkpointConfig.enableUnalignedCheckpoints(); > &gt; &gt; &amp;gt; > &gt; &gt; &amp;gt; > &gt; &gt; &amp;gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗? > &gt; > |
Free forum by Nabble | Edit this page |