关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

jindy_liu
场景上:
   
目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
   
目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
    目前测试了一版本flink
sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。

     所以产生以下想法,不知道可不可行?
   
1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
    2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
    目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?
   




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

Jark
Administrator
建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。

你可以参考下这几篇文章尝试调优下 rocksdb:

https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw
https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA
https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg


Best,
Jark

On Wed, 9 Dec 2020 at 12:19, jindy_liu <[hidden email]> wrote:

> 场景上:
>
>
> 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
>
>
> 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
>     目前测试了一版本flink
>
> sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。
>
>      所以产生以下想法,不知道可不可行?
>
>
> 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
>     2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
>     目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

Jark
Administrator
关于 rocksdb 的性能调优, @Yun Tang <[hidden email]> 会更清楚。

On Thu, 10 Dec 2020 at 11:04, Jark Wu <[hidden email]> wrote:

> 建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。
>
> 你可以参考下这几篇文章尝试调优下 rocksdb:
>
> https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
> https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw
> https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA
> https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
>
>
> Best,
> Jark
>
> On Wed, 9 Dec 2020 at 12:19, jindy_liu <[hidden email]> wrote:
>
>> 场景上:
>>
>>
>> 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
>>
>>
>> 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
>>     目前测试了一版本flink
>>
>> sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。
>>
>>      所以产生以下想法,不知道可不可行?
>>
>>
>> 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
>>     2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
>>     目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

Yun Tang
Hi

FsStateBackend 在性能上是比 RocksDBStateBackend 好,这个是符合预期的。不过想要获得高性能的话,需要更多的jvm堆上内存,但是大内存场景下的GC会很痛苦,所以并不是说加内存之后,性能可以线性增长。

现在还有一个问题是你的状态有多大呢,可以去有状态的节点上看DB的大小(通过增量checkpoint的checkpointed data size也可以间接推出),看CPU使用情况,看磁盘的iostat,来找到具体的瓶颈在哪里。

祝好
唐云

________________________________
From: Jark Wu <[hidden email]>
Sent: Thursday, December 10, 2020 11:04
To: user-zh <[hidden email]>
Cc: Yun Tang <[hidden email]>
Subject: Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

关于 rocksdb 的性能调优, @Yun Tang<mailto:[hidden email]> 会更清楚。

On Thu, 10 Dec 2020 at 11:04, Jark Wu <[hidden email]<mailto:[hidden email]>> wrote:
建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。

你可以参考下这几篇文章尝试调优下 rocksdb:

https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw
https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA
https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg


Best,
Jark

On Wed, 9 Dec 2020 at 12:19, jindy_liu <[hidden email]<mailto:[hidden email]>> wrote:
场景上:

目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。

目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
    目前测试了一版本flink
sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。

     所以产生以下想法,不知道可不可行?

1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
    2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
    目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

jindy_liu
flink
sql主要涉及到9张mysql表(snapshot+cdc),任务的解析后的算子较多,大概60~70个,但主要是join,和4~5个GroupAggregate算子,最后sink,sink不是瓶颈已经排除。

恩,已经调过几版参数了我的机型的配置是一样的,12核+24G内存 + ssd 50G,共6台(任务并行度设置为60,除去了flink mysql
cdc流的并行度为1,其它算子并行度都为60)
taskmgr的flink-conf主要参数为,其它为默认:
taskmanager.numberOfTaskSlots: 10
taskmanager.memory.process.size: 20480m
taskmanager.memory.managed.fraction: 0.75
taskmanager.memory.network.fraction: 0.2
taskmanager.memory.network.max: 4gb  #算子大概需要3G左右的network buf
taskmanager.memory.network.min: 128mb

#13G
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.writebuffer.count: 48
state.backend.rocksdb.writebuffer.size: 256M   #12G
state.backend.rocksdb.writebuffer.number-to-merge: 1
state.backend.rocksdb.block.cache-size: 2000M  #2112M
state.backend.rocksdb.block.blocksize: 64K
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.files.open: 90000

这一板算是比较好的一版,但依然感觉比较慢。观察性能,6台的性能差异不大,都差不多,也没看到明显的数据倾斜。
起始阶段大部分数据都是写状态,启动阶段来看,是速率输入最快的时刻,后面越来越慢,慢慢的source算子就出现反压了。

观察一小时的数据如下,1小时后,每张表平均能跑个500w左右的数据,从观察来看,cpu在等待时间较大,但load又比较重,10核也就能跑个5核左右。硬盘的io也没到ssd的瓶颈,状态大小跑1个小时后,每台机器ssd硬盘上(state.backend.rocksdb.localdir)的状态大小也是2GB左右。
<http://apache-flink.147419.n8.nabble.com/file/t670/topolo.png>
<http://apache-flink.147419.n8.nabble.com/file/t670/dstat.png>
<http://apache-flink.147419.n8.nabble.com/file/t670/ssd.png>
<http://apache-flink.147419.n8.nabble.com/file/t670/top.png>

这一版本参数里,主要是把state.backend.rocksdb.writebuffer.count调大了,在任务启动的时候,数据基本是都是写,比如把state.backend.rocksdb.writebuffer.count设为10的话,速度就更慢了,io
util会到80%左右。
这个参数感觉不是很好调。

尝试用内存跑,5几分钟左右就能把每张表能跑个500w左右的数据跑完,基本没io,cpu都是满状态跑,并且算子是没有反压的,当然任务基本马上也就被oom-kill了,内存不够。

不知道这里有啥优化方法么?@yuntang






--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

jindy_liu
补充一个,当我把state.backend.rocksdb.writebuffer.count: 48调小到10的话,

jstack来看,从https://spotify.github.io/threaddump-analyzer/分析来看

top类的方法基本都在rocksdb的io上了。并且很多线程都在等待
<http://apache-flink.147419.n8.nabble.com/file/t670/stack.png>

<http://apache-flink.147419.n8.nabble.com/file/t670/sleep.png>





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

Yun Tang
Hi

首先需要纠正一点的是,RocksDB的实际可用内存并不是你以为的13GB,因为从Flink-1.10 开始引入的 managed memory [1][2],会将slot上的RocksDB的实际可用内存限制在 managed memory / number of slots,也就是说对于你配置的10个slot,20GB的process memory,0.75的managed fraction,真实的per slot managed memory其实只有不到1.5GB,也就是说你配置的write buffer count以及max write buffer啥的并没有真正“生效”。RocksDB的write buffer manager会提前将write buffer 置为immutable并flush出去。应该增大 managed memory / number of slots 来增大单个slot内多个RocksDB的共享可用内存,来确保RocksDB的可用实际内存真的有效。
从你的栈看,很多时候卡在了数据put上,我怀疑是遇到了写阻塞 (write stall) [3],可以用async-profiler [4] 来观察RocksDB的内部相关调用栈。
另外,可以开启rocksDB的native metrics [5][6],观察RocksDB的写是不是经常被阻塞


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_tuning.html#rocksdb-state-backend
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/large_state_tuning.html#tuning-rocksdb-memory
[3] https://github.com/facebook/rocksdb/wiki/Write-Stalls
[4] https://github.com/jvm-profiling-tools/async-profiler
[5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-backend-rocksdb-metrics-actual-delayed-write-rate
[6] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-backend-rocksdb-metrics-is-write-stopped

祝好
唐云
________________________________
From: jindy_liu <[hidden email]>
Sent: Thursday, December 10, 2020 16:22
To: [hidden email] <[hidden email]>
Subject: Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

补充一个,当我把state.backend.rocksdb.writebuffer.count: 48调小到10的话,

jstack来看,从https://spotify.github.io/threaddump-analyzer/分析来看

top类的方法基本都在rocksdb的io上了。并且很多线程都在等待
<http://apache-flink.147419.n8.nabble.com/file/t670/stack.png>

<http://apache-flink.147419.n8.nabble.com/file/t670/sleep.png>





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

jindy_liu
In reply to this post by jindy_liu
感谢指正!这里我验证了下你的说法,确实我理解有误了,我以为这个参数write buffer count以及max write
buffer是taskmanager所有的slots的。从web
ui来看,确实是很多算子都出现了is_stop_write。你的推断是正确的,老的配置参数下,看了下,确实经常出现is_stop_write=1的情况,然后线程就阻塞sleep了。

昨天调整了一版参数:改了下Slot为2,还是6台机器,job并发度设置成12。结果是之前的阻写没有了。跑一晚上10个小时左右,能跑21000W每张表的速度了,并且现在看也没有阻写的情况,硬盘的读写iops与util都很低,基本没有。但这个距离上线还是有差据,也就是6台机器只能处理5000/s的数据性能,有点低。

taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 20480m
taskmanager.memory.managed.fraction: 0.75
taskmanager.memory.network.fraction: 0.2
taskmanager.memory.network.max: 4gb  #算子大概需要3G左右的network buf
taskmanager.memory.network.min: 128mb

#7G per slot writer + read
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.writebuffer.count: 20
state.backend.rocksdb.writebuffer.size: 256M   #5G
state.backend.rocksdb.writebuffer.number-to-merge: 1
state.backend.rocksdb.block.cache-size: 2000M  #2112M
state.backend.rocksdb.block.blocksize: 64K
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.files.open: 90000

查看些cpu很忙的机器,jstack发现性能开销都在读上了(跑21000W后),花在rocksdb.get读上较多,怎么看是读的内存还是磁盘来的?我看cpu比较忙的机器上,磁盘io读,基本没有了。看rocksdb本地dir所挂的ssd磁盘上的状态文件大小3台在7GB左右,别外3台在3GB左右(这里没法在web
ui上看checkpointed datasize大小,目前由于没有成功过 ,mysql-cdc-connector会一直超时失败)

@yuntang 这里看看rocksdb上还有提升空间和任务总体性能上还能有提升?

但出现1,2机器的cpu负载明显比其它低的情况,这个感觉可能还有另外一个问题,存在些倾斜???!!!
因为有些AGG算子,我开了代码调整了些,开了minibatch
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "10 s");
configuration.setString("table.exec.mini-batch.size", "10000");


<http://apache-flink.147419.n8.nabble.com/file/t670/top_reader.png>
<http://apache-flink.147419.n8.nabble.com/file/t670/io.png>
<http://apache-flink.147419.n8.nabble.com/file/t670/cdc_topology.png>



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

Yun Tang
Hi

在前面的邮件里面,已经提示可以使用 async-profiler [1] 来观察RocksDB的内部相关调用栈,这样能看到cpu是否在等待IO。另外iostat等工具可以看磁盘压力如何。至于数据倾斜,可以去看rocksDB的db目录大小,或者看各个subtask的input bytes大小,看看是否task间存在数据倾斜。


[1] https://github.com/jvm-profiling-tools/async-profiler

祝好
唐云
________________________________
From: jindy_liu <[hidden email]>
Sent: Friday, December 11, 2020 11:24
To: [hidden email] <[hidden email]>
Subject: Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

感谢指正!这里我验证了下你的说法,确实我理解有误了,我以为这个参数write buffer count以及max write
buffer是taskmanager所有的slots的。从web
ui来看,确实是很多算子都出现了is_stop_write。你的推断是正确的,老的配置参数下,看了下,确实经常出现is_stop_write=1的情况,然后线程就阻塞sleep了。

昨天调整了一版参数:改了下Slot为2,还是6台机器,job并发度设置成12。结果是之前的阻写没有了。跑一晚上10个小时左右,能跑21000W每张表的速度了,并且现在看也没有阻写的情况,硬盘的读写iops与util都很低,基本没有。但这个距离上线还是有差据,也就是6台机器只能处理5000/s的数据性能,有点低。

taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 20480m
taskmanager.memory.managed.fraction: 0.75
taskmanager.memory.network.fraction: 0.2
taskmanager.memory.network.max: 4gb  #算子大概需要3G左右的network buf
taskmanager.memory.network.min: 128mb

#7G per slot writer + read
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.writebuffer.count: 20
state.backend.rocksdb.writebuffer.size: 256M   #5G
state.backend.rocksdb.writebuffer.number-to-merge: 1
state.backend.rocksdb.block.cache-size: 2000M  #2112M
state.backend.rocksdb.block.blocksize: 64K
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.files.open: 90000

查看些cpu很忙的机器,jstack发现性能开销都在读上了(跑21000W后),花在rocksdb.get读上较多,怎么看是读的内存还是磁盘来的?我看cpu比较忙的机器上,磁盘io读,基本没有了。看rocksdb本地dir所挂的ssd磁盘上的状态文件大小3台在7GB左右,别外3台在3GB左右(这里没法在web
ui上看checkpointed datasize大小,目前由于没有成功过 ,mysql-cdc-connector会一直超时失败)

@yuntang 这里看看rocksdb上还有提升空间和任务总体性能上还能有提升?

但出现1,2机器的cpu负载明显比其它低的情况,这个感觉可能还有另外一个问题,存在些倾斜???!!!
因为有些AGG算子,我开了代码调整了些,开了minibatch
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "10 s");
configuration.setString("table.exec.mini-batch.size", "10000");


<http://apache-flink.147419.n8.nabble.com/file/t670/top_reader.png>
<http://apache-flink.147419.n8.nabble.com/file/t670/io.png>
<http://apache-flink.147419.n8.nabble.com/file/t670/cdc_topology.png>



--
Sent from: http://apache-flink.147419.n8.nabble.com/