大家好,
flink sql消费kafka join普通表是会性能爬坡吗? 背景是flink 1.12.0 使用flink sql在yarn per-job发布,消费kafka topic=trades,然后join 数据库里的维表 shop_meta 现在发现每次重启flink sql job,或上游突然增加大量写入时,flink sql的消费速度总是慢慢增加上来,这样就会造成上游积压,等flink sql消费速度上来之后才能慢慢把积压消费完毕。 更多的信息: trades是avro格式,大概有10个字段,但其中有一个字段full_info是一个大json,我这边写了处理json的UDF,就为每个字段都需要处理那个大json。最后生成将近25个字段写下游kafka shop_meta是普通表,没有时间字段,总共有12个字段,30000行左右。整个表数据和索引加起来是16MB;更新频率非常低。现在读jdbc的配置为lookup.cache.max-rows = 20000;lookup.cache.ttl = 2h;scan.fetch-size = 1000 SQL示例如下 ``` SELECT t.shop_id, s.shop_name, ... CAST(json_path_to_str(full_info, '$.response.trade.price', '0.0') AS DOUBLE) price, " CAST(json_path_to_str(full_info, '$.response.trade.payment', '0.0') AS DOUBLE) payment, " CAST(json_path_to_str(full_info, '$.response.trade.total_fee', '0.0') AS DOUBLE) total_fee, " CAST(json_path_to_str(full_info, '$.response.trade.discount_fee', '0.0') AS DOUBLE) discount_fee, " CAST(json_path_to_str(full_info, '$.response.trade.adjust_fee', '0.0') AS DOUBLE) adjust_fee, " CAST(json_path_to_str(full_info, '$.response.trade.received_payment', '0.0') AS DOUBLE) received_payment, " CAST(json_path_to_str(full_info, '$.response.trade.post_fee', '0.0') AS DOUBLE) post_fee, " json_path_to_str(full_info, '$.response.trade.receiver_name', '') receiver_name, " json_path_to_str(full_info, '$.response.trade.receiver_country', '') receiver_country, " json_path_to_str(full_info, '$.response.trade.receiver_state', '') receiver_state, " json_path_to_str(full_info, '$.response.trade.receiver_city', '') receiver_city, " FROM trades t LEFT JOIN shop_meta FOR SYSTEM_TIME AS OF t.proc_time AS s ON t.shop_id=s.shop_id ``` 考虑到整个job里只有简单的ETL,不涉及中间状态,flink对task_manager的配置为 taskmanager.memory.managed.fraction = 0.1 taskmanager.memory.network.fraction = 0.05 实际运行中,task_manager总内存为6G,6 slots,最大并行度为6,所以只有一个task manager。 在监控页面看到task heap=4.13 GB,实际使用heap_used指标比较稳定。 在监控页面中可以看到随着消费速度越来越快,task manager CPU利用率越来越高,KafkaConsumer_topic_partition_currentOffsets - KafkaConsumer_topic_partition_committedOffsets 也在随着消费速度上涨,新生代GC次数和时间也在上涨 当消费完积压后,前两个指标降低,新生代GC趋于平稳 请问有什么调查或解决的方向吗? 谢谢大家 |
为了测试到底是因为SQL里的parse json导致性能瓶颈,还是因为join维表
我在SQL中去掉join维表之后重启,发现只需要70s即可达到消费速度=3.8k,整个因为重启导致的积压被压缩到3分钟 所以应该是维表JOIN的问题 现在连的数据库是TiDB,连接串属性为 useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
使用Tidb作为维表库,想利用Tidb的性能,提高维表的实时性,但是每个来一条数据,你都需要查询一次,即使是TiDB也会存在性能瓶颈,我以前structured-streaming就体会过。
这个可以考虑定期广播维表去提供性能 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年4月22日(星期四) 上午10:50 收件人: "user-zh"<[hidden email]>; 主题: Re: flink sql消费kafka join普通表为何会性能爬坡? 为了测试到底是因为SQL里的parse json导致性能瓶颈,还是因为join维表 我在SQL中去掉join维表之后重启,发现只需要70s即可达到消费速度=3.8k,整个因为重启导致的积压被压缩到3分钟 所以应该是维表JOIN的问题 现在连的数据库是TiDB,连接串属性为 useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
读JDBC table是有缓存的,看了源码,是用Guava cache实现
文档上说,整个Task Manager进程共享使用一个Cache,所以应该和广播的效果是一样的?所以应该不是查询TiDB导致的性能问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by 飞翔
Cache设置大小为2w,超时时间为2h
实际上整个表大小为3w左右,考虑到整个表实际只有十几兆。我会尝试cache size设置为4w,保证整个表都能装进cache里。看会不会好一点 但是我查到现在怀疑跟savepoint有关: - 如果我设置kafka offset=earliest,不带savepoint重启,flink job启动消费时,lag有5000w左右,但是1分钟内就能达到约7k/s的消费速度。如下图,job在14:31启动,前面的速度特别大是因为offset重置,但是在14:33已经达到7.5k的消费速度 <http://apache-flink.147419.n8.nabble.com/file/t1249/fast.png> - 但是如果带savepoint启动,需要花35min才能达到这个消费速度。如下图 <http://apache-flink.147419.n8.nabble.com/file/t1249/slow.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by 飞翔
你得加本地缓存之类的呗
在 2021-04-22 11:21:55,"飞翔" <[hidden email]> 写道: >使用Tidb作为维表库,想利用Tidb的性能,提高维表的实时性,但是每个来一条数据,你都需要查询一次,即使是TiDB也会存在性能瓶颈,我以前structured-streaming就体会过。 >这个可以考虑定期广播维表去提供性能 > > > > >------------------ 原始邮件 ------------------ >发件人: "user-zh" <[hidden email]>; >发送时间: 2021年4月22日(星期四) 上午10:50 >收件人: "user-zh"<[hidden email]>; > >主题: Re: flink sql消费kafka join普通表为何会性能爬坡? > > > >为了测试到底是因为SQL里的parse json导致性能瓶颈,还是因为join维表 >我在SQL中去掉join维表之后重启,发现只需要70s即可达到消费速度=3.8k,整个因为重启导致的积压被压缩到3分钟 > >所以应该是维表JOIN的问题 >现在连的数据库是TiDB,连接串属性为 >useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
我这边有使用jdbc table属性加了本地缓存
尝试把cache size设置为400/20000/40000,然后重启,消费kafka速度都是需要慢慢上涨 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |