各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
size越来越大,请问是什么原因啊? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
你好
这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state size变大有几个原因: 1. 上游数据量增大。 2. window设置时间较长,尚未触发,导致window内积攒的数据比较大。 3. window的类型决定了所需要存储的state size较大。 可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽泛的建议。 [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations 祝好 唐云 ________________________________ From: ReignsDYL <[hidden email]> Sent: Wednesday, June 26, 2019 14:22 To: [hidden email] Subject: checkpoint stage size的问题 各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage size越来越大,请问是什么原因啊? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
老师你好,首先感谢你在百忙之中回复我。
我这面观察到的现象是,当有数据流入时,每个checkpoint的stage size比上一个checkpoint多几百k左右,只要数据持续流入,这个stage size就一直增长,当没有数据流入时,checkpoint的stage size就维持不变了,再有数据流入时,stage size就在原来基础上继续增长。 数据流: SingleOutputStreamOperator<StudentAggResult> studentSubjectStream = dataStream .filter(new Question2SubjectFilter()) .keyBy(new TaskStudentSubjectKeySelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(new StudentSubjectScoreAgg()); studentSubjectStream.addSink(getKafkaProducer(KafkaTopic.STUDENT_SUBJECT_AGG.getTopic(), StudentAggResult.class)); 聚合函数: public abstract class BaseAgg<T, R extends IMergeable> implements AggregateFunction<T, R, R> { public abstract R create(T input); public abstract void merge(R aggResult, T t); @Override public R createAccumulator() { return null; } @Override public R add(T t, R aggResult) { if (aggResult == null) { aggResult = create(t); } merge(aggResult, t); return aggResult; } @Override public R getResult(R aggResult) { return aggResult; } @Override public R merge(R aggResult, R acc1) { if (acc1 == null) { return aggResult; } if (aggResult == null) { return acc1; } aggResult.merge(acc1); return aggResult; } } checkpoint配置: env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 状态存储通过rocksdb。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Yun Tang
这是web ui的监控
<http://apache-flink.147419.n8.nabble.com/file/t26/checkpoint.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
你好
从附件的web监控看,其实你的整体checkpoint state其实很小(只有20几MB),所以对于这个问题其实有些过度关注了。 关于checkpoint state的变化,需要观察不同operator的情况,可以点开详细页看每个并发的情况。对比operator state和window所使用的keyed state的变化情况。我估计keyed state部分会有些许波动,主要是因为你使用的是RocksDB state backend,其实上传的是rocksDB的sst文件,当register timer时,window state会进行存储,当onTimer时,相关state会取出并更新或者删除,这里涉及到一个写放大和compaction的问题,rocksDB对某个key的删除不会直接对应物理上的存储的立刻减少。 祝好 唐云 ________________________________ From: ReignsDYL <[hidden email]> Sent: Wednesday, June 26, 2019 17:38 To: [hidden email] Subject: Re: checkpoint stage size的问题 这是web ui的监控 <http://apache-flink.147419.n8.nabble.com/file/t26/checkpoint.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
您好,感谢您的回复。
是这样,开始可能只是20几MB,但是只要有数据流入,它就一直变大,几个小时后,就达到了几百MB,并没有发现清理或者变小的现象。operator的每个subtask的stage zise也是均匀的。 另外,我简单的写了个demo,就是从kafka读数据,然后保存到hbase,我发现那个checkpoint的stage size虽然只有几十k,但是也在慢慢增长,每次比前一个均匀增加。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Yun Tang
你好,
如果有需要session窗口可能保持很长时间,数据量也很大,这种窗口会导致checkpoint stage size变的非常大 有没有一种机制可能让超过一定时间的状态失效并且丢弃掉? 在 2019-06-26 16:23:13,"Yun Tang" <[hidden email]> 写道: >你好 > >这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state size变大有几个原因: > > 1. 上游数据量增大。 > 2. window设置时间较长,尚未触发,导致window内积攒的数据比较大。 > 3. window的类型决定了所需要存储的state size较大。 > >可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽泛的建议。 > >[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations > >祝好 >唐云 >________________________________ >From: ReignsDYL <[hidden email]> >Sent: Wednesday, June 26, 2019 14:22 >To: [hidden email] >Subject: checkpoint stage size的问题 > >各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage >size越来越大,请问是什么原因啊? > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |