场景上:
目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。 目前测试了一版本flink sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。 所以产生以下想法,不知道可不可行? 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。 2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。 目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。
你可以参考下这几篇文章尝试调优下 rocksdb: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg Best, Jark On Wed, 9 Dec 2020 at 12:19, jindy_liu <[hidden email]> wrote: > 场景上: > > > 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。 > > > 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。 > 目前测试了一版本flink > > sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。 > > 所以产生以下想法,不知道可不可行? > > > 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。 > 2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。 > 目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢? > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
关于 rocksdb 的性能调优, @Yun Tang <[hidden email]> 会更清楚。
On Thu, 10 Dec 2020 at 11:04, Jark Wu <[hidden email]> wrote: > 建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。 > > 你可以参考下这几篇文章尝试调优下 rocksdb: > > https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA > https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw > https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA > https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg > > > Best, > Jark > > On Wed, 9 Dec 2020 at 12:19, jindy_liu <[hidden email]> wrote: > >> 场景上: >> >> >> 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。 >> >> >> 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。 >> 目前测试了一版本flink >> >> sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。 >> >> 所以产生以下想法,不知道可不可行? >> >> >> 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。 >> 2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。 >> 目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢? >> >> >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ > > |
Hi
FsStateBackend 在性能上是比 RocksDBStateBackend 好,这个是符合预期的。不过想要获得高性能的话,需要更多的jvm堆上内存,但是大内存场景下的GC会很痛苦,所以并不是说加内存之后,性能可以线性增长。 现在还有一个问题是你的状态有多大呢,可以去有状态的节点上看DB的大小(通过增量checkpoint的checkpointed data size也可以间接推出),看CPU使用情况,看磁盘的iostat,来找到具体的瓶颈在哪里。 祝好 唐云 ________________________________ From: Jark Wu <[hidden email]> Sent: Thursday, December 10, 2020 11:04 To: user-zh <[hidden email]> Cc: Yun Tang <[hidden email]> Subject: Re: 关于flink cdc的N流join状态后端的选择问题: FsStateBackend和RocksDBStateBackend 关于 rocksdb 的性能调优, @Yun Tang<mailto:[hidden email]> 会更清楚。 On Thu, 10 Dec 2020 at 11:04, Jark Wu <[hidden email]<mailto:[hidden email]>> wrote: 建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。 你可以参考下这几篇文章尝试调优下 rocksdb: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg Best, Jark On Wed, 9 Dec 2020 at 12:19, jindy_liu <[hidden email]<mailto:[hidden email]>> wrote: 场景上: 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。 目前测试了一版本flink sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。 所以产生以下想法,不知道可不可行? 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。 2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。 目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
flink
sql主要涉及到9张mysql表(snapshot+cdc),任务的解析后的算子较多,大概60~70个,但主要是join,和4~5个GroupAggregate算子,最后sink,sink不是瓶颈已经排除。 恩,已经调过几版参数了我的机型的配置是一样的,12核+24G内存 + ssd 50G,共6台(任务并行度设置为60,除去了flink mysql cdc流的并行度为1,其它算子并行度都为60) taskmgr的flink-conf主要参数为,其它为默认: taskmanager.numberOfTaskSlots: 10 taskmanager.memory.process.size: 20480m taskmanager.memory.managed.fraction: 0.75 taskmanager.memory.network.fraction: 0.2 taskmanager.memory.network.max: 4gb #算子大概需要3G左右的network buf taskmanager.memory.network.min: 128mb #13G state.backend.rocksdb.thread.num: 4 state.backend.rocksdb.writebuffer.count: 48 state.backend.rocksdb.writebuffer.size: 256M #12G state.backend.rocksdb.writebuffer.number-to-merge: 1 state.backend.rocksdb.block.cache-size: 2000M #2112M state.backend.rocksdb.block.blocksize: 64K state.backend.rocksdb.localdir: /opt/flink/rocksdb state.backend.rocksdb.files.open: 90000 这一板算是比较好的一版,但依然感觉比较慢。观察性能,6台的性能差异不大,都差不多,也没看到明显的数据倾斜。 起始阶段大部分数据都是写状态,启动阶段来看,是速率输入最快的时刻,后面越来越慢,慢慢的source算子就出现反压了。 观察一小时的数据如下,1小时后,每张表平均能跑个500w左右的数据,从观察来看,cpu在等待时间较大,但load又比较重,10核也就能跑个5核左右。硬盘的io也没到ssd的瓶颈,状态大小跑1个小时后,每台机器ssd硬盘上(state.backend.rocksdb.localdir)的状态大小也是2GB左右。 <http://apache-flink.147419.n8.nabble.com/file/t670/topolo.png> <http://apache-flink.147419.n8.nabble.com/file/t670/dstat.png> <http://apache-flink.147419.n8.nabble.com/file/t670/ssd.png> <http://apache-flink.147419.n8.nabble.com/file/t670/top.png> 这一版本参数里,主要是把state.backend.rocksdb.writebuffer.count调大了,在任务启动的时候,数据基本是都是写,比如把state.backend.rocksdb.writebuffer.count设为10的话,速度就更慢了,io util会到80%左右。 这个参数感觉不是很好调。 尝试用内存跑,5几分钟左右就能把每张表能跑个500w左右的数据跑完,基本没io,cpu都是满状态跑,并且算子是没有反压的,当然任务基本马上也就被oom-kill了,内存不够。 不知道这里有啥优化方法么?@yuntang -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
补充一个,当我把state.backend.rocksdb.writebuffer.count: 48调小到10的话,
jstack来看,从https://spotify.github.io/threaddump-analyzer/分析来看 top类的方法基本都在rocksdb的io上了。并且很多线程都在等待 <http://apache-flink.147419.n8.nabble.com/file/t670/stack.png> <http://apache-flink.147419.n8.nabble.com/file/t670/sleep.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi
首先需要纠正一点的是,RocksDB的实际可用内存并不是你以为的13GB,因为从Flink-1.10 开始引入的 managed memory [1][2],会将slot上的RocksDB的实际可用内存限制在 managed memory / number of slots,也就是说对于你配置的10个slot,20GB的process memory,0.75的managed fraction,真实的per slot managed memory其实只有不到1.5GB,也就是说你配置的write buffer count以及max write buffer啥的并没有真正“生效”。RocksDB的write buffer manager会提前将write buffer 置为immutable并flush出去。应该增大 managed memory / number of slots 来增大单个slot内多个RocksDB的共享可用内存,来确保RocksDB的可用实际内存真的有效。 从你的栈看,很多时候卡在了数据put上,我怀疑是遇到了写阻塞 (write stall) [3],可以用async-profiler [4] 来观察RocksDB的内部相关调用栈。 另外,可以开启rocksDB的native metrics [5][6],观察RocksDB的写是不是经常被阻塞 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_tuning.html#rocksdb-state-backend [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/large_state_tuning.html#tuning-rocksdb-memory [3] https://github.com/facebook/rocksdb/wiki/Write-Stalls [4] https://github.com/jvm-profiling-tools/async-profiler [5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-backend-rocksdb-metrics-actual-delayed-write-rate [6] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-backend-rocksdb-metrics-is-write-stopped 祝好 唐云 ________________________________ From: jindy_liu <[hidden email]> Sent: Thursday, December 10, 2020 16:22 To: [hidden email] <[hidden email]> Subject: Re: 关于flink cdc的N流join状态后端的选择问题: FsStateBackend和RocksDBStateBackend 补充一个,当我把state.backend.rocksdb.writebuffer.count: 48调小到10的话, jstack来看,从https://spotify.github.io/threaddump-analyzer/分析来看 top类的方法基本都在rocksdb的io上了。并且很多线程都在等待 <http://apache-flink.147419.n8.nabble.com/file/t670/stack.png> <http://apache-flink.147419.n8.nabble.com/file/t670/sleep.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by jindy_liu
感谢指正!这里我验证了下你的说法,确实我理解有误了,我以为这个参数write buffer count以及max write
buffer是taskmanager所有的slots的。从web ui来看,确实是很多算子都出现了is_stop_write。你的推断是正确的,老的配置参数下,看了下,确实经常出现is_stop_write=1的情况,然后线程就阻塞sleep了。 昨天调整了一版参数:改了下Slot为2,还是6台机器,job并发度设置成12。结果是之前的阻写没有了。跑一晚上10个小时左右,能跑21000W每张表的速度了,并且现在看也没有阻写的情况,硬盘的读写iops与util都很低,基本没有。但这个距离上线还是有差据,也就是6台机器只能处理5000/s的数据性能,有点低。 taskmanager.numberOfTaskSlots: 2 taskmanager.memory.process.size: 20480m taskmanager.memory.managed.fraction: 0.75 taskmanager.memory.network.fraction: 0.2 taskmanager.memory.network.max: 4gb #算子大概需要3G左右的network buf taskmanager.memory.network.min: 128mb #7G per slot writer + read state.backend.rocksdb.thread.num: 4 state.backend.rocksdb.writebuffer.count: 20 state.backend.rocksdb.writebuffer.size: 256M #5G state.backend.rocksdb.writebuffer.number-to-merge: 1 state.backend.rocksdb.block.cache-size: 2000M #2112M state.backend.rocksdb.block.blocksize: 64K state.backend.rocksdb.localdir: /opt/flink/rocksdb state.backend.rocksdb.files.open: 90000 查看些cpu很忙的机器,jstack发现性能开销都在读上了(跑21000W后),花在rocksdb.get读上较多,怎么看是读的内存还是磁盘来的?我看cpu比较忙的机器上,磁盘io读,基本没有了。看rocksdb本地dir所挂的ssd磁盘上的状态文件大小3台在7GB左右,别外3台在3GB左右(这里没法在web ui上看checkpointed datasize大小,目前由于没有成功过 ,mysql-cdc-connector会一直超时失败) @yuntang 这里看看rocksdb上还有提升空间和任务总体性能上还能有提升? ( 但出现1,2机器的cpu负载明显比其它低的情况,这个感觉可能还有另外一个问题,存在些倾斜???!!! 因为有些AGG算子,我开了代码调整了些,开了minibatch configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "10 s"); configuration.setString("table.exec.mini-batch.size", "10000"); ) <http://apache-flink.147419.n8.nabble.com/file/t670/top_reader.png> <http://apache-flink.147419.n8.nabble.com/file/t670/io.png> <http://apache-flink.147419.n8.nabble.com/file/t670/cdc_topology.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi
在前面的邮件里面,已经提示可以使用 async-profiler [1] 来观察RocksDB的内部相关调用栈,这样能看到cpu是否在等待IO。另外iostat等工具可以看磁盘压力如何。至于数据倾斜,可以去看rocksDB的db目录大小,或者看各个subtask的input bytes大小,看看是否task间存在数据倾斜。 [1] https://github.com/jvm-profiling-tools/async-profiler 祝好 唐云 ________________________________ From: jindy_liu <[hidden email]> Sent: Friday, December 11, 2020 11:24 To: [hidden email] <[hidden email]> Subject: Re: 关于flink cdc的N流join状态后端的选择问题: FsStateBackend和RocksDBStateBackend 感谢指正!这里我验证了下你的说法,确实我理解有误了,我以为这个参数write buffer count以及max write buffer是taskmanager所有的slots的。从web ui来看,确实是很多算子都出现了is_stop_write。你的推断是正确的,老的配置参数下,看了下,确实经常出现is_stop_write=1的情况,然后线程就阻塞sleep了。 昨天调整了一版参数:改了下Slot为2,还是6台机器,job并发度设置成12。结果是之前的阻写没有了。跑一晚上10个小时左右,能跑21000W每张表的速度了,并且现在看也没有阻写的情况,硬盘的读写iops与util都很低,基本没有。但这个距离上线还是有差据,也就是6台机器只能处理5000/s的数据性能,有点低。 taskmanager.numberOfTaskSlots: 2 taskmanager.memory.process.size: 20480m taskmanager.memory.managed.fraction: 0.75 taskmanager.memory.network.fraction: 0.2 taskmanager.memory.network.max: 4gb #算子大概需要3G左右的network buf taskmanager.memory.network.min: 128mb #7G per slot writer + read state.backend.rocksdb.thread.num: 4 state.backend.rocksdb.writebuffer.count: 20 state.backend.rocksdb.writebuffer.size: 256M #5G state.backend.rocksdb.writebuffer.number-to-merge: 1 state.backend.rocksdb.block.cache-size: 2000M #2112M state.backend.rocksdb.block.blocksize: 64K state.backend.rocksdb.localdir: /opt/flink/rocksdb state.backend.rocksdb.files.open: 90000 查看些cpu很忙的机器,jstack发现性能开销都在读上了(跑21000W后),花在rocksdb.get读上较多,怎么看是读的内存还是磁盘来的?我看cpu比较忙的机器上,磁盘io读,基本没有了。看rocksdb本地dir所挂的ssd磁盘上的状态文件大小3台在7GB左右,别外3台在3GB左右(这里没法在web ui上看checkpointed datasize大小,目前由于没有成功过 ,mysql-cdc-connector会一直超时失败) @yuntang 这里看看rocksdb上还有提升空间和任务总体性能上还能有提升? ( 但出现1,2机器的cpu负载明显比其它低的情况,这个感觉可能还有另外一个问题,存在些倾斜???!!! 因为有些AGG算子,我开了代码调整了些,开了minibatch configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "10 s"); configuration.setString("table.exec.mini-batch.size", "10000"); ) <http://apache-flink.147419.n8.nabble.com/file/t670/top_reader.png> <http://apache-flink.147419.n8.nabble.com/file/t670/io.png> <http://apache-flink.147419.n8.nabble.com/file/t670/cdc_topology.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |