不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND '20170307'" # 获取Query结果 query_table = env.sql_query(sql) query_table.to_pandas() 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? 由于python只是封装了一下flink的接口,所以会是GIL的影响么? 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! |
Hi,
差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 Best Xingbo xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道: > 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. > sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID > = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND > '20170307'" > # 获取Query结果 > query_table = env.sql_query(sql) > query_table.to_pandas() > 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? > 由于python只是封装了一下flink的接口,所以会是GIL的影响么? > 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! > > |
Hi, Xingbo
非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道: >Hi, > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > >Best >Xingbo > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道: > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND >> '20170307'" >> # 获取Query结果 >> query_table = env.sql_query(sql) >> query_table.to_pandas() >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! >> >> |
In reply to this post by Xingbo Huang
所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!
在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道: >Hi, > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > >Best >Xingbo > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道: > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND >> '20170307'" >> # 获取Query结果 >> query_table = env.sql_query(sql) >> query_table.to_pandas() >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! >> >> |
In reply to this post by Xingbo Huang
Hi, Xingbo
非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~! 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道: >Hi, > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > >Best >Xingbo > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道: > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND >> '20170307'" >> # 获取Query结果 >> query_table = env.sql_query(sql) >> query_table.to_pandas() >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! >> >> |
Hi,
你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。 Best, Xingbo xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道: > Hi, Xingbo > 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, > 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, > 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 > > 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~! > > > > > > > > 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道: > >Hi, > > > > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > > > >Best > >Xingbo > > > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道: > > > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. > >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON > source1.ID > >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND > >> '20170307'" > >> # 获取Query结果 > >> query_table = env.sql_query(sql) > >> query_table.to_pandas() > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! > >> > >> > |
Hi,
我是用的flink1.12的pandas类型的udaf, 代码如下: @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") def logReturn(i, j): df = pd.DataFrame({'id': i, 'rate': j}) df['rate1'] = df['rate'] + 1 return numpy.prod(df['rate1']) - 1 调用方式为: result = query_table.group_by(query_table.PF_ID).select(query_table.ID, logReturn( query_table.ID, query_table.RATE)).execute_insert('print').wait() 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的; java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果; 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~ 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~ [hidden email] 发件人: Xingbo Huang 发送时间: 2021-03-02 09:42 收件人: user-zh 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s Hi, 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。 Best, Xingbo xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道: > Hi, Xingbo > 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, > 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, > 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 > > 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~! > > > > > > > > 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道: > >Hi, > > > > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > > > >Best > >Xingbo > > > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道: > > > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. > >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON > source1.ID > >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND > >> '20170307'" > >> # 获取Query结果 > >> query_table = env.sql_query(sql) > >> query_table.to_pandas() > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! > >> > >> > |
Hi,
首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。 然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟? Best, Xingbo [hidden email] <[hidden email]> 于2021年3月2日周二 上午9:57写道: > Hi, > 我是用的flink1.12的pandas类型的udaf, 代码如下: > @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") > def logReturn(i, j): > df = pd.DataFrame({'id': i, 'rate': j}) > df['rate1'] = df['rate'] + 1 > return numpy.prod(df['rate1']) - 1 > 调用方式为: > result = > query_table.group_by(query_table.PF_ID).select(query_table.ID, > logReturn( > query_table.ID, > > query_table.RATE)).execute_insert('print').wait() > 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的; > java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果; > 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。 > 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~ > 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~ > > > > [hidden email] > > 发件人: Xingbo Huang > 发送时间: 2021-03-02 09:42 > 收件人: user-zh > 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s > Hi, > > 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas > > udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。 > > Best, > Xingbo > > xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道: > > > Hi, Xingbo > > 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, > > 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, > > 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 > > > > 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or > numpy中的矩阵计算,非常感谢~! > > > > > > > > > > > > > > > > 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道: > > >Hi, > > > > > > > > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > > > > > >Best > > >Xingbo > > > > > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道: > > > > > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. > > >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON > > source1.ID > > >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND > > >> '20170307'" > > >> # 获取Query结果 > > >> query_table = env.sql_query(sql) > > >> query_table.to_pandas() > > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? > > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? > > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! > > >> > > >> > > > |
Hi,
是的,就是在batch模式下,我是只在本机local下执行的,不是集群模式,把全部代码贴一下吧。 python版: # 建立环境(udaf仅支持批环境) env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() env = BatchTableEnvironment.create(environment_settings=env_settings) # 表1 1千万行 source_ddl1 = """CREATE TABLE TP_GL_DAY (DAY_ID VARCHAR(8),IS_EXCH_DAY DECIMAL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:port/db?useSSL=False', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = 'xxx', 'table-name' = 'TP_GL_DAY') """ #表2 700多行 source_ddl2 = """CREATE TABLE TS_PF_SEC_YLDRATE (PF_ID VARCHAR(10),\ SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),\ CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:port/db?useSSL=False', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = 'xxx', 'table-name' = 'TS_PF_SEC_YLDRATE') """ # sink print_sink_ddl = """ CREATE TABLE print( pf_id VARCHAR(10), out_put FLOAT ) WITH ( 'connector' = 'print' ) """ # 源表 env.execute_sql(source_ddl1) env.execute_sql(source_ddl2) # sink env.execute_sql(print_sink_ddl) sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID = '456' AND BIZ_DATE BETWEEN '20160701' AND '20170307'" # 获取Query结果 query_table = env.sql_query(sql) # 执行udaf # udaf 聚合函数计算 @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") def logReturn(i, j): df = pd.DataFrame({'pf_id': i, 'yldrate': j}) df['yldrate1'] = df['yldrate'] + 1 return np.prod(df['yldrate1']) - 1 # 执行并打印 result = query_table.group_by(query_table.PF_ID).select(query_table.PF_ID, logReturn( query_table.PF_ID, query_table.YLDRATE)).execute_insert('print').wait() Java版本: Java选用的环境是流环境: StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings streamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, streamSettings); streamEnv.execute(""); 计算部分: java这边的queryData也是通过定义connector DDL注册源表后,执行sql获取的。 tableEnv.registerFunction("add", new addFunction()); tableEnv.registerFunction("prod", new ProductUdaf()); Table addedTable = tableEnv.sqlQuery("SELECT pf_id,add(yldrate) as yldrate FROM queryData"); tableEnv.createTemporaryView("addedTable", addedTable); Table resultTable = tableEnv.sqlQuery("SELECT pf_id,prod(yldrate)-1 as yldrate FROM addedTable group by pf_id"); 因为java版本代码,是同事写的,但逻辑按照python这边的逻辑,执行时间上python看本机的cpu占用情况(每次执行时不超过8%)会跑400或500s不等,基本维持在400s左右;我的电脑是win10 64位,RAM16GB,主频2.3GHz, 内核4,逻辑处理器8. [hidden email] 发件人: Xingbo Huang 发送时间: 2021-03-02 11:59 收件人: user-zh 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s Hi, 首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。 然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟? Best, Xingbo [hidden email] <[hidden email]> 于2021年3月2日周二 上午9:57写道: > Hi, > 我是用的flink1.12的pandas类型的udaf, 代码如下: > @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") > def logReturn(i, j): > df = pd.DataFrame({'id': i, 'rate': j}) > df['rate1'] = df['rate'] + 1 > return numpy.prod(df['rate1']) - 1 > 调用方式为: > result = > query_table.group_by(query_table.PF_ID).select(query_table.ID, > logReturn( > query_table.ID, > > query_table.RATE)).execute_insert('print').wait() > 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的; > java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果; > 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。 > 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~ > 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~ > > > > [hidden email] > > 发件人: Xingbo Huang > 发送时间: 2021-03-02 09:42 > 收件人: user-zh > 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s > Hi, > > 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas > > udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。 > > Best, > Xingbo > > xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道: > > > Hi, Xingbo > > 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, > > 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, > > 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 > > > > 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or > numpy中的矩阵计算,非常感谢~! > > > > > > > > > > > > > > > > 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道: > > >Hi, > > > > > > > > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > > > > > >Best > > >Xingbo > > > > > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道: > > > > > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. > > >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON > > source1.ID > > >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND > > >> '20170307'" > > >> # 获取Query结果 > > >> query_table = env.sql_query(sql) > > >> query_table.to_pandas() > > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? > > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? > > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! > > >> > > >> > > > |
Hi,
不好意思回复这么晚。关于pandas udaf,我有专门测试过框架层的开销(函数用普通的均值计算)。和java相比,差距也就3,4倍左右,具体可以参考代码[1]。关于你这个代码,我怀疑是因为你函数实现的问题。你这个函数构造df是会有额外的开销。你为啥不直接使用j来进行计算。当然了,你也可以根据调整一些参数来提高性能,比如python.fn-execution.bundle.size和python.fn-execution.bundle.time,具体可以参考文档[2]。 [1] https://github.com/HuangXingBo/pyflink-performance-demo/blob/master/python/flink/flink-pandas-udaf-test.py [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-fn-execution-bundle-size Best, Xingbo [hidden email] <[hidden email]> 于2021年3月2日周二 下午1:38写道: > Hi, > 是的,就是在batch模式下,我是只在本机local下执行的,不是集群模式,把全部代码贴一下吧。 > python版: > # 建立环境(udaf仅支持批环境) > env_settings = > EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() > env = BatchTableEnvironment.create(environment_settings=env_settings) > # 表1 1千万行 > source_ddl1 = """CREATE TABLE TP_GL_DAY (DAY_ID > VARCHAR(8),IS_EXCH_DAY DECIMAL > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://ip:port/db?useSSL=False', > 'driver' = 'com.mysql.cj.jdbc.Driver', > 'username' = 'root', > 'password' = 'xxx', > 'table-name' = 'TP_GL_DAY') > """ > #表2 700多行 > source_ddl2 = """CREATE TABLE TS_PF_SEC_YLDRATE (PF_ID VARCHAR(10),\ > SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),\ > CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12) > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://ip:port/db?useSSL=False', > 'driver' = 'com.mysql.cj.jdbc.Driver', > 'username' = 'root', > 'password' = 'xxx', > 'table-name' = 'TS_PF_SEC_YLDRATE') > """ > # sink > print_sink_ddl = """ > CREATE TABLE print( > pf_id VARCHAR(10), > out_put FLOAT > ) WITH ( > 'connector' = 'print' > ) > """ > # 源表 > env.execute_sql(source_ddl1) > env.execute_sql(source_ddl2) > # sink > env.execute_sql(print_sink_ddl) > > sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN > TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID = > '456' AND BIZ_DATE BETWEEN '20160701' AND '20170307'" > > # 获取Query结果 > query_table = env.sql_query(sql) > # 执行udaf > # udaf 聚合函数计算 > @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") > def logReturn(i, j): > df = pd.DataFrame({'pf_id': i, 'yldrate': j}) > df['yldrate1'] = df['yldrate'] + 1 > return np.prod(df['yldrate1']) - 1 > # 执行并打印 > result = > query_table.group_by(query_table.PF_ID).select(query_table.PF_ID, > logReturn( > query_table.PF_ID, > > query_table.YLDRATE)).execute_insert('print').wait() > > Java版本: > Java选用的环境是流环境: > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings streamSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv, streamSettings); > streamEnv.execute(""); > 计算部分: > java这边的queryData也是通过定义connector DDL注册源表后,执行sql获取的。 > tableEnv.registerFunction("add", new addFunction()); > tableEnv.registerFunction("prod", new ProductUdaf()); > Table addedTable = tableEnv.sqlQuery("SELECT pf_id,add(yldrate) as > yldrate FROM queryData"); > tableEnv.createTemporaryView("addedTable", addedTable); > Table resultTable = tableEnv.sqlQuery("SELECT > pf_id,prod(yldrate)-1 as yldrate FROM addedTable group by pf_id"); > > 因为java版本代码,是同事写的,但逻辑按照python这边的逻辑,执行时间上python看本机的cpu占用情况(每次执行时不超过8%)会跑400或500s不等,基本维持在400s左右;我的电脑是win10 > 64位,RAM16GB,主频2.3GHz, 内核4,逻辑处理器8. > > > [hidden email] > > 发件人: Xingbo Huang > 发送时间: 2021-03-02 11:59 > 收件人: user-zh > 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s > Hi, > > 首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。 > > 然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟? > > Best, > Xingbo > > [hidden email] <[hidden email]> 于2021年3月2日周二 上午9:57写道: > > > Hi, > > 我是用的flink1.12的pandas类型的udaf, 代码如下: > > @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") > > def logReturn(i, j): > > df = pd.DataFrame({'id': i, 'rate': j}) > > df['rate1'] = df['rate'] + 1 > > return numpy.prod(df['rate1']) - 1 > > 调用方式为: > > result = > > query_table.group_by(query_table.PF_ID).select(query_table.ID, > > logReturn( > > query_table.ID, > > > > query_table.RATE)).execute_insert('print').wait() > > 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的; > > java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果; > > 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。 > > 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~ > > 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~ > > > > > > > > [hidden email] > > > > 发件人: Xingbo Huang > > 发送时间: 2021-03-02 09:42 > > 收件人: user-zh > > 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s > > Hi, > > > > 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas > > > > > udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。 > > > > Best, > > Xingbo > > > > xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道: > > > > > Hi, Xingbo > > > 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, > > > 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, > > > 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 > > > > > > 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or > > numpy中的矩阵计算,非常感谢~! > > > > > > > > > > > > > > > > > > > > > > > > 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道: > > > >Hi, > > > > > > > > > > > > > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > > > > > > > >Best > > > >Xingbo > > > > > > > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道: > > > > > > > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. > > > >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON > > > source1.ID > > > >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND > > > >> '20170307'" > > > >> # 获取Query结果 > > > >> query_table = env.sql_query(sql) > > > >> query_table.to_pandas() > > > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? > > > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? > > > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! > > > >> > > > >> > > > > > > |
Free forum by Nabble | Edit this page |