flink使用StateBackend问题

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink使用StateBackend问题

守护
各位Flink社区大佬,大家好!


现象:我是在代码中配置StateBackend的,代码是这样写的env.setStateBackend(new RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true)),但是我发现在下面的SQLwindow中始终在checkpoint那就过不去,总是报n/a,

下面我就把代码中的这句给注释掉了,在flink-conf.xml文件中配置
state.checkpoints.dir: hdfs://host51:9000/flink/flink-checkpoints这样就能正常执行,
请问SQLwindow和RocksDBStateBackend有什么冲突吗,上面的原因是什么?
Reply | Threaded
Open this post in threaded view
|

Re: flink使用StateBackend问题

Wesley Peng
Hi

on 2019/9/3 12:14, 々守护々 wrote:
> 现象:我是在代码中配置StateBackend的,代码是这样写的env.setStateBackend(new RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true)),但是我发现在下面的SQLwindow中始终在checkpoint那就过不去,总是报n/a,
>
> 下面我就把代码中的这句给注释掉了,在flink-conf.xml文件中配置
> state.checkpoints.dir: hdfs://host51:9000/flink/flink-checkpoints这样就能正常执行,
> 请问SQLwindow和RocksDBStateBackend有什么冲突吗,上面的原因是什么?


It seems like a filesystem issue, though I have no experience on it too.

regards.
Reply | Threaded
Open this post in threaded view
|

回复: flink使用StateBackend问题

守护
1.我用相同的代码逻辑用FlinkAPI写window同样env.setStateBackend(new RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true))设置rocksDB,在checkpoint时候能执行,用时也和小Completed checkpoint 142 for job 25a50baff7d16ee22aecb7b1******** (806418 bytes in 426 ms).
2.当我用SQLwindow窗口执行,也是配置的rocksDB,就会checkpoint不成功,window的checkpoint全都是n/a
2019-09-03 15:17:52,142 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 1 from 915988718742f11135a6e96ba42c4e35 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:17:52,143 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 1 from 191ca20964de323300551d9a144c272c of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:17:52,144 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 1 from 2a24625ac03ea6ac4b8632719768c502 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:17:52,144 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 1 from 51b376365d044c672677c38ca1caee78 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:17:52,227 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 1 from 9266a7307add46cb26f5a9352c82ceb0 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:18:21,881 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 4 from 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:18:21,882 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 3 from 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3.
请问这个是什么问题呢?




------------------ 原始邮件 ------------------
发件人: "Wesley Peng"<[hidden email]>;
发送时间: 2019年9月3日(星期二) 下午3:24
收件人: "user-zh"<[hidden email]>;

主题: Re: flink使用StateBackend问题



Hi

on 2019/9/3 12:14, 々守护々 wrote:
> 现象:我是在代码中配置StateBackend的,代码是这样写的env.setStateBackend(new RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true)),但是我发现在下面的SQLwindow中始终在checkpoint那就过不去,总是报n/a,
>
> 下面我就把代码中的这句给注释掉了,在flink-conf.xml文件中配置
> state.checkpoints.dir: hdfs://host51:9000/flink/flink-checkpoints这样就能正常执行,
> 请问SQLwindow和RocksDBStateBackend有什么冲突吗,上面的原因是什么?


It seems like a filesystem issue, though I have no experience on it too.

regards.
Reply | Threaded
Open this post in threaded view
|

Re: 回复: flink使用StateBackend问题

Wesley Peng


on 2019/9/3 15:38, 守护 wrote:
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 3 from 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3.
> 请问这个是什么问题呢?

可以根据这些失败的task的id去查询这些任务落在哪一个taskmanager上,经过排查发现,是同一台机器,通过ui看到该机器流入的数据明显比别的流入量大
因此是因为数据倾斜导致了这个问题,追根溯源还是下游消费能力不足的问题

also reference:
https://juejin.im/post/5c374fe3e51d451bd1663756
Reply | Threaded
Open this post in threaded view
|

Re: 回复: flink使用StateBackend问题

Yun Tang
Hi


  1.  Checkpoint 超时时间设置是多少(是默认的10min么),如果超时时间太短,容易checkpoint failure
  2.  所有的subtask都是n/a 么,source task的checkpoint没有rocksDb的参与,与使用默认的MemoryStateBackend其实是一样的,不应该source task也没有完成checkpoint(除非一直都拿不到StreamTask里面的锁,一直都在process element)
  3.  作业的反压情况如何,是不是使用RocksDB时候存在严重的反压(back pressure)情况?如果作业反压的话,barrier一直都流不到下游,容易造成checkpoint超时。

建议分享一下作业webUI上的checkpoint 信息。

祝好
唐云
________________________________
From: Wesley Peng <[hidden email]>
Sent: Tuesday, September 3, 2019 15:44
To: [hidden email] <[hidden email]>
Subject: Re: 回复: flink使用StateBackend问题



on 2019/9/3 15:38, 守护 wrote:
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 3 from 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3.
> 请问这个是什么问题呢?

可以根据这些失败的task的id去查询这些任务落在哪一个taskmanager上,经过排查发现,是同一台机器,通过ui看到该机器流入的数据明显比别的流入量大
因此是因为数据倾斜导致了这个问题,追根溯源还是下游消费能力不足的问题

also reference:
https://juejin.im/post/5c374fe3e51d451bd1663756