This post was updated on .
Dear all,
我有两个Flink Job A和B A job任务是消费kafka topic01数据,经过一系列逻辑加工,最终将数据sink到topic02 其中加工大致过程是:消费到topic01消息后,根据数据相关字段查询redis、查询hbase,然后组装业务数据(过程比较复杂),然后将业务数据写到另一个topic02,30s做一次checkpoint,state大小只有几十kb,但做一次checkpoint平均需要两分钟,导致topic01消息产生堆积,实时性降低。 B job任务简单,消费上一步的的业务数据topic02,开一个半个小时的窗口将数据进行聚合(keyby、max)之后写到orc file,state大小几百兆,但耗时是秒级别。 我比较疑惑的是为什么A job的state那么小,但checkpoint却很耗时,不知道从哪个角度去优化该问题。 A job的某次checkpoint过程如下图: 并不是sync或者async的速度慢,是某个slot整个过程就慢(4、9、13、25),导致整个checkpoint变慢 checkpoint是用rocksdb存储 请各位指教 ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
排查一下任务在执行过程中,是否有背压,以及在ck过程中,buffer积压了多少数据量。
很可能是在访问hbase的过程,性能不是很好。 在2021年06月03日 15:27,Jacob 写道: Dear all, 我有一个两个Flink Job A和B A job任务是消费kafka topic01数据,经过一系列逻辑加工,最终将数据sink到topic02 其中加工大致过程是:消费到topic01消息后,根据数据相关字段查询redis、查询hbase,然后组装业务数据(过程比较复杂),然后将业务数据写到另一个topic02,30s做一次checkpoint,state大小只有几十kb,但做一次checkpoint平均需要两分钟,导致topic01消息产生堆积,实时性降低。 B job任务简单,消费上一步的的业务数据topic02,开一个半个小时的窗口将数据进行聚合(keyby、max)之后写到orc file,state大小几百兆,但耗时是秒级别。 我比较疑惑的是为什么A job的state那么小,但checkpoint却很耗时,不知道从哪个角度去优化该问题。 请各位指教 ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
@lian 谢谢回复
我通过webui查询,没有背压的情况。 hbase性能这块确实存在一定问题,公司的hbase性能一直不佳,但我的程序中,是先从缓存redis中取,大部分数据都能查到,只有少部分会查hbase。 谢谢你提供的思路,我将会从代码级别考虑,看是否能优化相关逻辑 ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
In reply to this post by lian
@lian 谢谢回复
我通过webui查询,没有背压的情况。 hbase性能这块确实存在一定问题,公司的hbase性能一直不佳,但我的程序中,是先从缓存redis中取,大部分数据都能查到,只有少部分会查hbase。 谢谢你提供的思路,我将会从代码级别考虑,看是否能优化相关逻辑 ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
In reply to this post by Jacob
hi
你理解的可能有点偏差,应该是因为任务出现了反压或者数据倾斜的问题导致了cp时间长,01消息堆积说明已经反压到source端了,需要先定位反压的位置,看是具体什么原因导致的,然后再根据情况解决. ----- Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee |
In reply to this post by Jacob
一般是 Job A出现背压了,checkpoint的时候是要等背压的数据都处理完了才会处理barrier。
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
This post was updated on .
In reply to this post by JasonLee
@JasonLee 谢谢回复
A job 的背压情况如下图 <http://apache-flink.147419.n8.nabble.com/file/t1162/backpressure.png> 我清楚job处理数据速度的确赶不上消息的生产速度这一事实,但暂时想不到一个合理的解决办法,并行度都已经设置的比较大了(从等于topic分区数量已经调整为大于partition数量了)。 我把各个task的并行度设置是一样的,让他们链在一个task上,从而优化线程切换的性能 其中 Map算子是最耗时的,所有的逻辑和数据加工都在这个Map算子,前后两个Flat Map 都是简单的将List数据扁平化而已,没有什么耗时操作。开始它们的并行度我设25(topic01 partition数量、消息速率是:1000~2000条/s),后面直接改成80,但并没有明显效果。 ---------------------Update--------------------- 当我把各个算子并行度设置不一样的时候,前两个算子(source、flatmap)确实出现反压情况了 ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
hi
source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask 空跑,浪费资源,你只需要把 map 的并行度调大即可. ----- Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee |
你任务A中的redis和hbase是异步还是同步访问,同步是肯定不行的。ckpt小是因为没啥状态,自然就小,时间长可能是数据对齐时间长,你估计用的是对齐检查点是吧?
如果换成非对齐检查点,时间应该能降下来,但是状态会变得很大,你可以试试。 最佳做法是,改造成异步的,不能同步。 JasonLee <[hidden email]> 于2021年6月4日周五 上午10:57写道: > > hi > > source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask > 空跑,浪费资源,你只需要把 map 的并行度调大即可. > > > > ----- > Best Wishes > JasonLee > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
@nobleyd 谢谢回复
你任务A中的redis和hbase是异步还是同步访问,------------------- 同步 你估计用的是对齐检查点是吧? -------------------是的 同步访问,是因为我们要及时生成新数据,换做异步就无法即时拿到最新的结果数据了 检查点我刚调整为非对齐方式了,从做完的十个checkpoint来看,state大小确实增加了,但速度尚未变快 消息量确实比较大,处理逻辑也较为复杂,处理逻辑算子的并行度我给了100,source并行度等于topic分区数 ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
我懂你意思,每个输入数据,经过redis、hbase等访问,以及相关调整(比如字段设置等),然后这个记录需要继续作为此算子的输出是吧。
我表达的是指你需要用异步访问redis、hbase方式,这个配合flink的异步算子去实现。所以你说的那个需求基于异步的是可以满足的。 Jacob <[hidden email]> 于2021年6月4日周五 下午3:21写道: > > @nobleyd 谢谢回复 > > 你任务A中的redis和hbase是异步还是同步访问,------------------- 同步 > > 你估计用的是对齐检查点是吧? -------------------是的 > > > 同步访问,是因为我们要及时生成新数据,换做异步就无法即时拿到最新的结果数据了 > > 检查点我刚调整为非对齐方式了,从做完的十个checkpoint来看,state大小确实增加了,但速度尚未变快 > > > 消息量确实比较大,处理逻辑也较为复杂,处理逻辑算子的并行度我给了100,source并行度等于topic分区数 > > > > ----- > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
This post was updated on .
嗯嗯 你的描述是对的,job的执行过程大致就是如此
我明白你意思了 谢谢你提供的思路,我需要学习一下这个异步算子,之前从未接触过。 我交互redis和hbase时通过lettuce、hbase-client实现的,并未通过flink相关connector 不太清楚这具体是一个怎样的流程,请问你那边有相关的demo吗,或者该去具体去看哪部分的内容? ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
官方就有文档。其实本质就是一个异步操作假设1ms,那么同步操作的1s也就能1000个操作,qps太低了。异步的话可以大大提高qps。
Jacob <[hidden email]> 于2021年6月4日周五 下午5:58写道: > > 嗯嗯 你的描述是对的,job的执行过程大致就是如此 > > > 我明白你意思了 > > 谢谢你提供的思路,我需要学习一下这个异步算子,之前从未接触过,不太清楚这具体是一个怎样的流程,请问你那边有相关的demo吗,或者该去具体去看哪部分的内容? > > > > > > ----- > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
This post was updated on .
thanks,
我查看了相关文档[1] 由于redis以及hbase的交互地方比较多,而且还有其他逻辑。 我打算把之前map算子的整段逻辑以线程池的形式丢在asyncInvoke()方法内部,不知道合适与否,这样数据的顺序性就无法得到保障了吧? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/ <https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/> ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
可以的,本身异步操作的本质就是线程池。 至于是你自己提供线程池,去执行某个同步操作。还是直接使用client/sdk等封装的异步方法内部默认的线程池这个无所谓。
Jacob <[hidden email]> 于2021年6月5日周六 下午1:15写道: > > thanks, > > 我查看了相关文档[1] 由于redis以及hbase的交互地方比较多,比较零散,不光是查询,还有回写redis > > 我打算把之前map算子的整段逻辑以线程池的形式丢在asyncInvoke()方法内部,不知道合适与否,这样数据的顺序性就无法得到保障了吧? > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/ > <https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/> > > > > ----- > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
@nobleyd
谢谢大神指导,前两天休息没看邮件,才回复,抱歉 我后面把代码大概改成如下样子,checkpoint时间确实得到了改善,job运行几天正常 我提供的线程池在asyncInvoke方法内部跑,这样是不是不合适呀,asyncInvoke方法本身是不是就是封装好的异步方法,就不用单独启线程池了吧?直接在asyncInvoke方法内部写处理逻辑就好。 public class AsyncProcessFunction extends RichAsyncFunction<Map<String, String>, List<JSONObject>> { private transient ExecutorService executorpool; @Override public void open(Configuration parameters) throws Exception { executorpool= Executors.newFixedThreadPool(80); } @Override public void asyncInvoke(Map<String, String> message, ResultFuture<List<JSONObject>> resultFuture){ executorpool.submit(()->{ // 处理逻辑 .............. resultFuture.complete(Collections.singletonList(...)); }); } } ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
不是的哈,那个方法本身还是同步调用的。就是需要你自己保证逻辑的异步。
Jacob <[hidden email]> 于2021年6月8日周二 上午9:31写道: > > @nobleyd > 谢谢大神指导,前两天休息没看邮件,才回复,抱歉 > > 我后面把代码大概改成如下样子,checkpoint时间确实得到了改善,job运行几天正常 > > 我提供的线程池在asyncInvoke方法内部跑,这样是不是不合适呀,asyncInvoke方法本身是不是就是封装好的异步方法,就不用单独启线程池了吧?直接在asyncInvoke方法内部写处理逻辑就好。 > > public class AsyncProcessFunction extends RichAsyncFunction<Map<String, > String>, List<JSONObject>> { > > private transient ExecutorService executorpool; > > @Override > public void open(Configuration parameters) throws Exception { > executorpool= Executors.newFixedThreadPool(80); > } > > @Override > public void asyncInvoke(Map<String, String> message, > ResultFuture<List<JSONObject>> resultFuture){ > executorpool.submit(()->{ > // 处理逻辑 > .............. > resultFuture.complete(Collections.singletonList(...)); > }); > } > } > > > > ----- > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
嗯嗯
明白了,感谢大神最近的指导! ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
Free forum by Nabble | Edit this page |