pyflink资源优化问题,请教

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink资源优化问题,请教

苗红宾
你好:
业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。


现在的使用方式:
1、slide_window = Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
2、使用sql语句注册kafka connector,
3、result table使用普通的print:
CREATE TABLE sink (
city_id STRING ,

start_time TIMESTAMP ,

end_time TIMESTAMP ,

flag   STRING

) with (
    'connector' = 'print'
)
4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv(xxxx)"),然后调用计算函数,读取csv文件内容
5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w, city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()




这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。


所以想请教一下:
1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等


多谢
Reply | Threaded
Open this post in threaded view
|

回复:pyflink资源优化问题,请教

郭华威
hidden email

在2021年04月06日 11:36,苗红宾 写道:
你好:
业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。


现在的使用方式:
1、slide_window = Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
2、使用sql语句注册kafka connector,
3、result table使用普通的print:
CREATE TABLE sink (
city_id STRING ,

start_time TIMESTAMP ,

end_time TIMESTAMP ,

flag   STRING

) with (
   'connector' = 'print'
)
4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv(xxxx)"),然后调用计算函数,读取csv文件内容
5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w, city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()




这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。


所以想请教一下:
1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等


多谢
Reply | Threaded
Open this post in threaded view
|

Re: pyflink资源优化问题,请教

Dian Fu
处理逻辑看起来应该是没有问题的。

1)可以详细说一下,你说的数据延迟问题吗?现在的qps可以达到多少,预期是多少?
2)你现在用的哪种部署模式?
3)并发度的设置可以参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/table_environment.html#configuration <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/table_environment.html#configuration>
4)内存相关的配置的配置项可以看一下:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#memory-configuration <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#memory-configuration>,设置方法,可以参考3)
5)至于并发度/内存设置成多少,这个完全取决于你的业务逻辑以及需要达到的qps,具体值需要测一下才知道设置成多少合适。


> 2021年4月6日 上午11:43,郭华威 <[hidden email]> 写道:
>
> hidden email
>
> 在2021年04月06日 11:36,苗红宾 写道:
> 你好:
> 业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。
>
>
> 现在的使用方式:
> 1、slide_window = Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
> 2、使用sql语句注册kafka connector,
> 3、result table使用普通的print:
> CREATE TABLE sink (
> city_id STRING ,
>
> start_time TIMESTAMP ,
>
> end_time TIMESTAMP ,
>
> flag   STRING
>
> ) with (
>   'connector' = 'print'
> )
> 4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv(xxxx)"),然后调用计算函数,读取csv文件内容
> 5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w, city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()
>
>
>
>
> 这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。
>
>
> 所以想请教一下:
> 1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
> 2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等
>
>
> 多谢