flink checkpoint导致反压严重

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

flink checkpoint导致反压严重

zhanglachun
大佬们好,我一个flink任务,计算一分钟内的某项几项指标的中位数,总共5个指标,因为中位数计算需要全窗口数据排序,所以计算比较复杂,现在遇到的问题的是一旦开启checkpoint任务就从source端开始反压严重,但关闭checkpoint就正常运行.
目前优化的步骤有:
1.语义放弃exactlyonce 改到atleast
2.分析发现keyby过程中有数据倾斜,已改成分布聚合,在第一轮聚合中key后添加随机数,在去除key后缀进行第二轮聚合
3.计算过程中使用RoaringBitmap作为中间数据缓存容器,最大限度减少内存损耗
4.增大并行度,提交时增大-yjm -ytm 内存配置
5.调整ck间隔时间
经过以上一些优化,任务性能确实有提高,绝大部分时间能正常运行,但每到业务高峰期(本公司业务最高峰在每天上午10点,这个点的数据量是平常的7~8倍),反压就立马非常严重,直至任务挂掉,但也没有oom之类的有效的错误日志输出

以上优化在各个版本都有测试,1.9,1.10,1.11都是一样的问题,总的来说的就是ck导致任务反压严重,不开启ck时,就算不经过上面的优化也正常运行,包括10点的业务高峰点

请大佬们给点优化思路,目前生产只能放弃ck,但这终归不是解决办法



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink checkpoint导致反压严重

Xiao Xu
input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
window 里面
Reply | Threaded
Open this post in threaded view
|

Re: flink checkpoint导致反压严重

LakeShen
Hi zhanglachun,

你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢

Best,
LakeShen

徐骁 <[hidden email]> 于2020年8月26日周三 上午2:10写道:

> input
>   .keyBy(<key selector>)
>   .timeWindow(<duration>)
>   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
>
> 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
> window 里面
>
Reply | Threaded
Open this post in threaded view
|

Re: flink checkpoint导致反压严重

Yun Tang
Hi

对于已经改为at least once的checkpoint,其在checkpoint时对于作业吞吐的影响只有task 同步阶段的snapshot,这个时间段的snapshot由于与task的主线程的数据访问持有同一把锁,会影响主线程的数据处理。但是就算这样,我也很怀疑checkpoint本身并不是导致早上10点高峰期无法运行的罪魁祸首。
使用异步的,支持增量的state backend (例如RocksDBStateBackend)会大大缓解该问题。
建议排查思路:

  1.  检查使用的state backend类型
  2.  检查是否存在sync阶段checkpoint用时久的问题(可以观察WEB UI上的checkpoint部分,看sync阶段的耗时)
  3.  借助jstack等工具,检查执行checkpoint时,TM上的task执行逻辑,判断是哪里消耗CPU

祝好
唐云
________________________________
From: LakeShen <[hidden email]>
Sent: Wednesday, August 26, 2020 10:00
To: user-zh <[hidden email]>
Subject: Re: flink checkpoint导致反压严重

Hi zhanglachun,

你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢

Best,
LakeShen

徐骁 <[hidden email]> 于2020年8月26日周三 上午2:10写道:

> input
>   .keyBy(<key selector>)
>   .timeWindow(<duration>)
>   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
>
> 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
> window 里面
>
Reply | Threaded
Open this post in threaded view
|

Re: flink checkpoint导致反压严重

Congxian Qiu
Hi
    对于开启 Checkpoint
之后导致反压的情况,如果希望在现在的基础上进行优化的话,则需要找到反压的原因是什么,可以尝试从最后一个被反压的算子开始排查,到底什么原因导致的,排查过程中,或许
Arthas[1] 可以有一些帮助
    另外比较好奇的是,为什么反压会导致你的作业挂掉呢?作业挂掉的原因是啥呢
[1] https://github.com/alibaba/arthas
Best,
Congxian


Yun Tang <[hidden email]> 于2020年8月26日周三 上午11:25写道:

> Hi
>
> 对于已经改为at least once的checkpoint,其在checkpoint时对于作业吞吐的影响只有task
> 同步阶段的snapshot,这个时间段的snapshot由于与task的主线程的数据访问持有同一把锁,会影响主线程的数据处理。但是就算这样,我也很怀疑checkpoint本身并不是导致早上10点高峰期无法运行的罪魁祸首。
> 使用异步的,支持增量的state backend (例如RocksDBStateBackend)会大大缓解该问题。
> 建议排查思路:
>
>   1.  检查使用的state backend类型
>   2.  检查是否存在sync阶段checkpoint用时久的问题(可以观察WEB UI上的checkpoint部分,看sync阶段的耗时)
>   3.  借助jstack等工具,检查执行checkpoint时,TM上的task执行逻辑,判断是哪里消耗CPU
>
> 祝好
> 唐云
> ________________________________
> From: LakeShen <[hidden email]>
> Sent: Wednesday, August 26, 2020 10:00
> To: user-zh <[hidden email]>
> Subject: Re: flink checkpoint导致反压严重
>
> Hi zhanglachun,
>
> 你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢
>
> Best,
> LakeShen
>
> 徐骁 <[hidden email]> 于2020年8月26日周三 上午2:10写道:
>
> > input
> >   .keyBy(<key selector>)
> >   .timeWindow(<duration>)
> >   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
> >
> > 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
> > window 里面
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: flink checkpoint导致反压严重

zhanglachun
感谢大佬,现在基本可以确定是数据倾斜导致ck缓慢和反压严重
理由:
1.在webui查看计算子subtasks信息,3个subtask中其中一个的数据量只有其他两个的1/3
2.将key添加随机数后,计算性能直线上升(当然只是为了测试,结算结果显然不是预期的)

之前我做过分布聚合来解决缓解数据倾斜问题

需求:比如有两个字段:url,respontse_time,按url
keyby,一分钟时间窗口,计算该url的响应时间(respontse_time)中位数
这里明显几个首页url的访问量会非常大,有些详情页url可能就访问量很小,这就肯定会有数据倾斜

我之前的分布聚合步骤是:
第一步聚合中:在url后添加1~50的整型随机数,keyby后使用RoaringBitmap缓存respontse_time.
结果类似于
(www.baidu.com1,[1,2,3])
(www.baidu.com2,[4,5,6])
第二步聚合中:将url后缀去除,再次keyby后,将第一轮的RoaringBitmap缓存的数据整合
结果类似于
(www.baidu.com,[1,2,3,4,5,6])
最后将RoaringBitmap里的全窗口数据计算最终中位数结果
www.baidu.com  (3+4)/2=3.5

现在看来应该提升不佳,因为是计算中位数,就我目前对中位数的原理理解,无论如何都必须将全窗口数据缓存到窗口结束才能触发结果计算,也就是我上面的步骤,虽然有分步,但实际上只是分步缓存,而不是分步计算,不能像sum和count计算,可以在分步聚合中,逐步累加中间结果

对于这类有不能增量累加的数据倾斜场景,不知大佬有没有比较好的解决知道



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink checkpoint导致反压严重

JasonLee
In reply to this post by zhanglachun
hi

我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀.



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee
Reply | Threaded
Open this post in threaded view
|

Re: flink checkpoint导致反压严重

Congxian Qiu
Hi
    如果我理解没错的话,这种 单 key 热点的问题,需要算 中位数(无法像 sum/count
这样分步计算的),只能通过现在你写的这种方法,先分布聚合,然后最终再计算中位数。不过或许可以找找数学方法,看有没有近似的算法
Best,
Congxian


赵一旦 <[hidden email]> 于2020年9月1日周二 上午10:15写道:

> (1)url理论上足够多,也足够随机。而并行度比如是30,url理论上是万、十万、百万、千万级别,理论上不会出现数据倾斜吧。
> (2)如果的确有倾斜,那么你那个方法我看不出有啥用,我看你好像是全缓存下来?这没啥用吧。
> (3)我的思路,考虑到你是要求1分钟窗口,每个url维度的,response的中位数。所以本质需要url+time维度的全部response数据排序。
>          由于url数量可能比较少(比如和并行度类似),导致了数据倾斜。所以key不能仅用url,需要分步。
>
>
>  分步方法:如果url的访问量总体极大,则response的值应该有很大重复,比如url1对应response=2ms的有1000个,对应3ms的有500个等这种量级。这样的话直接url+response作为key作为第一级统计也可以降低很大压力,同时加了response后应该就够分散了。第2级别拿到的是url级别的不同response+出现次数的数据。根据这些是可以计算中位数的,同时第二步压力也低,因为url1可能有1w流量,但response的不同值可能是100个。
>          前提背景:每个url的流量 >> 该url的response不同值(即具有相同response+url的流量不少)。
>
> JasonLee <[hidden email]> 于2020年8月31日周一 下午7:31写道:
>
> > hi
> >
> > 我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀.
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>