flink sql消费kafka join普通表为何会性能爬坡?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql消费kafka join普通表为何会性能爬坡?

Xi Shen
大家好,


flink sql消费kafka join普通表是会性能爬坡吗?


背景是flink 1.12.0 使用flink sql在yarn per-job发布,消费kafka topic=trades,然后join 数据库里的维表 shop_meta
现在发现每次重启flink sql job,或上游突然增加大量写入时,flink sql的消费速度总是慢慢增加上来,这样就会造成上游积压,等flink sql消费速度上来之后才能慢慢把积压消费完毕。


更多的信息:
trades是avro格式,大概有10个字段,但其中有一个字段full_info是一个大json,我这边写了处理json的UDF,就为每个字段都需要处理那个大json。最后生成将近25个字段写下游kafka
shop_meta是普通表,没有时间字段,总共有12个字段,30000行左右。整个表数据和索引加起来是16MB;更新频率非常低。现在读jdbc的配置为lookup.cache.max-rows = 20000;lookup.cache.ttl = 2h;scan.fetch-size = 1000
SQL示例如下
```
SELECT
t.shop_id, s.shop_name,
        ...
CAST(json_path_to_str(full_info, '$.response.trade.price', '0.0') AS DOUBLE) price, "
CAST(json_path_to_str(full_info, '$.response.trade.payment', '0.0') AS DOUBLE) payment, "
CAST(json_path_to_str(full_info, '$.response.trade.total_fee', '0.0') AS DOUBLE) total_fee, "
CAST(json_path_to_str(full_info, '$.response.trade.discount_fee', '0.0') AS DOUBLE) discount_fee, "
CAST(json_path_to_str(full_info, '$.response.trade.adjust_fee', '0.0') AS DOUBLE) adjust_fee, "
CAST(json_path_to_str(full_info, '$.response.trade.received_payment', '0.0') AS DOUBLE) received_payment, "
CAST(json_path_to_str(full_info, '$.response.trade.post_fee', '0.0') AS DOUBLE) post_fee, "
json_path_to_str(full_info, '$.response.trade.receiver_name', '') receiver_name, "
json_path_to_str(full_info, '$.response.trade.receiver_country', '') receiver_country, "
json_path_to_str(full_info, '$.response.trade.receiver_state', '') receiver_state, "
json_path_to_str(full_info, '$.response.trade.receiver_city', '') receiver_city, "
FROM trades t LEFT JOIN shop_meta FOR SYSTEM_TIME AS OF t.proc_time AS s
ON t.shop_id=s.shop_id
```


考虑到整个job里只有简单的ETL,不涉及中间状态,flink对task_manager的配置为
taskmanager.memory.managed.fraction = 0.1
taskmanager.memory.network.fraction = 0.05
实际运行中,task_manager总内存为6G,6 slots,最大并行度为6,所以只有一个task manager。
在监控页面看到task heap=4.13 GB,实际使用heap_used指标比较稳定。
在监控页面中可以看到随着消费速度越来越快,task manager CPU利用率越来越高,KafkaConsumer_topic_partition_currentOffsets - KafkaConsumer_topic_partition_committedOffsets 也在随着消费速度上涨,新生代GC次数和时间也在上涨
当消费完积压后,前两个指标降低,新生代GC趋于平稳


请问有什么调查或解决的方向吗?
谢谢大家
Reply | Threaded
Open this post in threaded view
|

Re: flink sql消费kafka join普通表为何会性能爬坡?

Xi Shen
为了测试到底是因为SQL里的parse json导致性能瓶颈,还是因为join维表
我在SQL中去掉join维表之后重启,发现只需要70s即可达到消费速度=3.8k,整个因为重启导致的积压被压缩到3分钟

所以应该是维表JOIN的问题
现在连的数据库是TiDB,连接串属性为
useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

回复: flink sql消费kafka join普通表为何会性能爬坡?

飞翔
使用Tidb作为维表库,想利用Tidb的性能,提高维表的实时性,但是每个来一条数据,你都需要查询一次,即使是TiDB也会存在性能瓶颈,我以前structured-streaming就体会过。
这个可以考虑定期广播维表去提供性能




------------------ 原始邮件 ------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年4月22日(星期四) 上午10:50
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: flink sql消费kafka join普通表为何会性能爬坡?



为了测试到底是因为SQL里的parse json导致性能瓶颈,还是因为join维表
我在SQL中去掉join维表之后重启,发现只需要70s即可达到消费速度=3.8k,整个因为重启导致的积压被压缩到3分钟

所以应该是维表JOIN的问题
现在连的数据库是TiDB,连接串属性为
useUnicode=true&amp;characterEncoding=UTF-8&amp;serverTimezone=Asia/Shanghai&amp;rewriteBatchedStatements=true



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

Xi Shen
读JDBC table是有缓存的,看了源码,是用Guava cache实现

文档上说,整个Task Manager进程共享使用一个Cache,所以应该和广播的效果是一样的?所以应该不是查询TiDB导致的性能问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

Xi Shen
In reply to this post by 飞翔
Cache设置大小为2w,超时时间为2h
实际上整个表大小为3w左右,考虑到整个表实际只有十几兆。我会尝试cache size设置为4w,保证整个表都能装进cache里。看会不会好一点


但是我查到现在怀疑跟savepoint有关:
- 如果我设置kafka offset=earliest,不带savepoint重启,flink
job启动消费时,lag有5000w左右,但是1分钟内就能达到约7k/s的消费速度。如下图,job在14:31启动,前面的速度特别大是因为offset重置,但是在14:33已经达到7.5k的消费速度
<http://apache-flink.147419.n8.nabble.com/file/t1249/fast.png>
- 但是如果带savepoint启动,需要花35min才能达到这个消费速度。如下图
<http://apache-flink.147419.n8.nabble.com/file/t1249/slow.png>



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:回复: flink sql消费kafka join普通表为何会性能爬坡?

Michael Ran
In reply to this post by 飞翔
你得加本地缓存之类的呗
在 2021-04-22 11:21:55,"飞翔" <[hidden email]> 写道:

>使用Tidb作为维表库,想利用Tidb的性能,提高维表的实时性,但是每个来一条数据,你都需要查询一次,即使是TiDB也会存在性能瓶颈,我以前structured-streaming就体会过。
>这个可以考虑定期广播维表去提供性能
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
>发送时间:&nbsp;2021年4月22日(星期四) 上午10:50
>收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
>主题:&nbsp;Re: flink sql消费kafka join普通表为何会性能爬坡?
>
>
>
>为了测试到底是因为SQL里的parse json导致性能瓶颈,还是因为join维表
>我在SQL中去掉join维表之后重启,发现只需要70s即可达到消费速度=3.8k,整个因为重启导致的积压被压缩到3分钟
>
>所以应该是维表JOIN的问题
>现在连的数据库是TiDB,连接串属性为
>useUnicode=true&amp;characterEncoding=UTF-8&amp;serverTimezone=Asia/Shanghai&amp;rewriteBatchedStatements=true
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re:回复: flink sql消费kafka join普通表为何会性能爬坡?

Xi Shen
我这边有使用jdbc table属性加了本地缓存

尝试把cache size设置为400/20000/40000,然后重启,消费kafka速度都是需要慢慢上涨



--
Sent from: http://apache-flink.147419.n8.nabble.com/