大佬们好,我一个flink任务,计算一分钟内的某项几项指标的中位数,总共5个指标,因为中位数计算需要全窗口数据排序,所以计算比较复杂,现在遇到的问题的是一旦开启checkpoint任务就从source端开始反压严重,但关闭checkpoint就正常运行.
目前优化的步骤有: 1.语义放弃exactlyonce 改到atleast 2.分析发现keyby过程中有数据倾斜,已改成分布聚合,在第一轮聚合中key后添加随机数,在去除key后缀进行第二轮聚合 3.计算过程中使用RoaringBitmap作为中间数据缓存容器,最大限度减少内存损耗 4.增大并行度,提交时增大-yjm -ytm 内存配置 5.调整ck间隔时间 经过以上一些优化,任务性能确实有提高,绝大部分时间能正常运行,但每到业务高峰期(本公司业务最高峰在每天上午10点,这个点的数据量是平常的7~8倍),反压就立马非常严重,直至任务挂掉,但也没有oom之类的有效的错误日志输出 以上优化在各个版本都有测试,1.9,1.10,1.11都是一样的问题,总的来说的就是ck导致任务反压严重,不开启ck时,就算不经过上面的优化也正常运行,包括10点的业务高峰点 请大佬们给点优化思路,目前生产只能放弃ck,但这终归不是解决办法 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
input
.keyBy(<key selector>) .timeWindow(<duration>) .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到 window 里面 |
Hi zhanglachun,
你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢 Best, LakeShen 徐骁 <[hidden email]> 于2020年8月26日周三 上午2:10写道: > input > .keyBy(<key selector>) > .timeWindow(<duration>) > .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); > > 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到 > window 里面 > |
Hi
对于已经改为at least once的checkpoint,其在checkpoint时对于作业吞吐的影响只有task 同步阶段的snapshot,这个时间段的snapshot由于与task的主线程的数据访问持有同一把锁,会影响主线程的数据处理。但是就算这样,我也很怀疑checkpoint本身并不是导致早上10点高峰期无法运行的罪魁祸首。 使用异步的,支持增量的state backend (例如RocksDBStateBackend)会大大缓解该问题。 建议排查思路: 1. 检查使用的state backend类型 2. 检查是否存在sync阶段checkpoint用时久的问题(可以观察WEB UI上的checkpoint部分,看sync阶段的耗时) 3. 借助jstack等工具,检查执行checkpoint时,TM上的task执行逻辑,判断是哪里消耗CPU 祝好 唐云 ________________________________ From: LakeShen <[hidden email]> Sent: Wednesday, August 26, 2020 10:00 To: user-zh <[hidden email]> Subject: Re: flink checkpoint导致反压严重 Hi zhanglachun, 你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢 Best, LakeShen 徐骁 <[hidden email]> 于2020年8月26日周三 上午2:10写道: > input > .keyBy(<key selector>) > .timeWindow(<duration>) > .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); > > 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到 > window 里面 > |
Hi
对于开启 Checkpoint 之后导致反压的情况,如果希望在现在的基础上进行优化的话,则需要找到反压的原因是什么,可以尝试从最后一个被反压的算子开始排查,到底什么原因导致的,排查过程中,或许 Arthas[1] 可以有一些帮助 另外比较好奇的是,为什么反压会导致你的作业挂掉呢?作业挂掉的原因是啥呢 [1] https://github.com/alibaba/arthas Best, Congxian Yun Tang <[hidden email]> 于2020年8月26日周三 上午11:25写道: > Hi > > 对于已经改为at least once的checkpoint,其在checkpoint时对于作业吞吐的影响只有task > 同步阶段的snapshot,这个时间段的snapshot由于与task的主线程的数据访问持有同一把锁,会影响主线程的数据处理。但是就算这样,我也很怀疑checkpoint本身并不是导致早上10点高峰期无法运行的罪魁祸首。 > 使用异步的,支持增量的state backend (例如RocksDBStateBackend)会大大缓解该问题。 > 建议排查思路: > > 1. 检查使用的state backend类型 > 2. 检查是否存在sync阶段checkpoint用时久的问题(可以观察WEB UI上的checkpoint部分,看sync阶段的耗时) > 3. 借助jstack等工具,检查执行checkpoint时,TM上的task执行逻辑,判断是哪里消耗CPU > > 祝好 > 唐云 > ________________________________ > From: LakeShen <[hidden email]> > Sent: Wednesday, August 26, 2020 10:00 > To: user-zh <[hidden email]> > Subject: Re: flink checkpoint导致反压严重 > > Hi zhanglachun, > > 你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢 > > Best, > LakeShen > > 徐骁 <[hidden email]> 于2020年8月26日周三 上午2:10写道: > > > input > > .keyBy(<key selector>) > > .timeWindow(<duration>) > > .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); > > > > 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到 > > window 里面 > > > |
感谢大佬,现在基本可以确定是数据倾斜导致ck缓慢和反压严重
理由: 1.在webui查看计算子subtasks信息,3个subtask中其中一个的数据量只有其他两个的1/3 2.将key添加随机数后,计算性能直线上升(当然只是为了测试,结算结果显然不是预期的) 之前我做过分布聚合来解决缓解数据倾斜问题 需求:比如有两个字段:url,respontse_time,按url keyby,一分钟时间窗口,计算该url的响应时间(respontse_time)中位数 这里明显几个首页url的访问量会非常大,有些详情页url可能就访问量很小,这就肯定会有数据倾斜 我之前的分布聚合步骤是: 第一步聚合中:在url后添加1~50的整型随机数,keyby后使用RoaringBitmap缓存respontse_time. 结果类似于 (www.baidu.com1,[1,2,3]) (www.baidu.com2,[4,5,6]) 第二步聚合中:将url后缀去除,再次keyby后,将第一轮的RoaringBitmap缓存的数据整合 结果类似于 (www.baidu.com,[1,2,3,4,5,6]) 最后将RoaringBitmap里的全窗口数据计算最终中位数结果 www.baidu.com (3+4)/2=3.5 现在看来应该提升不佳,因为是计算中位数,就我目前对中位数的原理理解,无论如何都必须将全窗口数据缓存到窗口结束才能触发结果计算,也就是我上面的步骤,虽然有分步,但实际上只是分步缓存,而不是分步计算,不能像sum和count计算,可以在分步聚合中,逐步累加中间结果 对于这类有不能增量累加的数据倾斜场景,不知大佬有没有比较好的解决知道 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by zhanglachun
hi
我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀. -- Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee |
Hi
如果我理解没错的话,这种 单 key 热点的问题,需要算 中位数(无法像 sum/count 这样分步计算的),只能通过现在你写的这种方法,先分布聚合,然后最终再计算中位数。不过或许可以找找数学方法,看有没有近似的算法 Best, Congxian 赵一旦 <[hidden email]> 于2020年9月1日周二 上午10:15写道: > (1)url理论上足够多,也足够随机。而并行度比如是30,url理论上是万、十万、百万、千万级别,理论上不会出现数据倾斜吧。 > (2)如果的确有倾斜,那么你那个方法我看不出有啥用,我看你好像是全缓存下来?这没啥用吧。 > (3)我的思路,考虑到你是要求1分钟窗口,每个url维度的,response的中位数。所以本质需要url+time维度的全部response数据排序。 > 由于url数量可能比较少(比如和并行度类似),导致了数据倾斜。所以key不能仅用url,需要分步。 > > > 分步方法:如果url的访问量总体极大,则response的值应该有很大重复,比如url1对应response=2ms的有1000个,对应3ms的有500个等这种量级。这样的话直接url+response作为key作为第一级统计也可以降低很大压力,同时加了response后应该就够分散了。第2级别拿到的是url级别的不同response+出现次数的数据。根据这些是可以计算中位数的,同时第二步压力也低,因为url1可能有1w流量,但response的不同值可能是100个。 > 前提背景:每个url的流量 >> 该url的response不同值(即具有相同response+url的流量不少)。 > > JasonLee <[hidden email]> 于2020年8月31日周一 下午7:31写道: > > > hi > > > > 我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀. > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |