flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

肖越
不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
 sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND '20170307'"
# 获取Query结果
    query_table = env.sql_query(sql)
    query_table.to_pandas()
相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
由于python只是封装了一下flink的接口,所以会是GIL的影响么?
蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !

Reply | Threaded
Open this post in threaded view
|

Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

Xingbo Huang
Hi,

差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。

Best
Xingbo

xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道:

> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> '20170307'"
> # 获取Query结果
>     query_table = env.sql_query(sql)
>     query_table.to_pandas()
> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

肖越
Hi, Xingbo
    非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
    项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
    也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。



















在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>>     query_table = env.sql_query(sql)
>>     query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

肖越
In reply to this post by Xingbo Huang
所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!

















在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>>     query_table = env.sql_query(sql)
>>     query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

肖越
In reply to this post by Xingbo Huang
Hi, Xingbo
    非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
    项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
    也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。

    所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!







在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>>     query_table = env.sql_query(sql)
>>     query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

Xingbo Huang
Hi,

你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。

Best,
Xingbo

xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道:

> Hi, Xingbo
>     非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
>     项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
>     也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
>
>     所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!
>
>
>
>
>
>
>
> 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道:
> >Hi,
> >
>
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> >
> >Best
> >Xingbo
> >
> >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道:
> >
> >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> >>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> source1.ID
> >> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> >> '20170307'"
> >> # 获取Query结果
> >>     query_table = env.sql_query(sql)
> >>     query_table.to_pandas()
> >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

xiaoyue@ysstech.com
Hi,
    我是用的flink1.12的pandas类型的udaf, 代码如下:
    @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
    def logReturn(i, j):
        df = pd.DataFrame({'id': i, 'rate': j})
        df['rate1'] = df['rate'] + 1
        return numpy.prod(df['rate1']) - 1
    调用方式为:
     result = query_table.group_by(query_table.PF_ID).select(query_table.ID,
                                                     logReturn(
                                                         query_table.ID,
                                                         query_table.RATE)).execute_insert('print').wait()
这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~



[hidden email]
 
发件人: Xingbo Huang
发送时间: 2021-03-02 09:42
收件人: user-zh
主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi,
 
你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
 
Best,
Xingbo
 
xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道:
 

> Hi, Xingbo
>     非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
>     项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
>     也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
>
>     所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!
>
>
>
>
>
>
>
> 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道:
> >Hi,
> >
>
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> >
> >Best
> >Xingbo
> >
> >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道:
> >
> >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> >>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> source1.ID
> >> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> >> '20170307'"
> >> # 获取Query结果
> >>     query_table = env.sql_query(sql)
> >>     query_table.to_pandas()
> >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

Xingbo Huang
Hi,

首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。
然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟?

Best,
Xingbo

[hidden email] <[hidden email]> 于2021年3月2日周二 上午9:57写道:

> Hi,
>     我是用的flink1.12的pandas类型的udaf, 代码如下:
>     @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
>     def logReturn(i, j):
>         df = pd.DataFrame({'id': i, 'rate': j})
>         df['rate1'] = df['rate'] + 1
>         return numpy.prod(df['rate1']) - 1
>     调用方式为:
>      result =
> query_table.group_by(query_table.PF_ID).select(query_table.ID,
>                                                      logReturn(
>                                                          query_table.ID,
>
>  query_table.RATE)).execute_insert('print').wait()
> 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
> java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
> 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
> 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
> 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~
>
>
>
> [hidden email]
>
> 发件人: Xingbo Huang
> 发送时间: 2021-03-02 09:42
> 收件人: user-zh
> 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> Hi,
>
> 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
>
> udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
>
> Best,
> Xingbo
>
> xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道:
>
> > Hi, Xingbo
> >     非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> >     项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> >     也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
> >
> >     所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or
> numpy中的矩阵计算,非常感谢~!
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道:
> > >Hi,
> > >
> >
> >
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> > >
> > >Best
> > >Xingbo
> > >
> > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道:
> > >
> > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> > >>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> > source1.ID
> > >> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> > >> '20170307'"
> > >> # 获取Query结果
> > >>     query_table = env.sql_query(sql)
> > >>     query_table.to_pandas()
> > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> > >>
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

xiaoyue@ysstech.com
Hi,
    是的,就是在batch模式下,我是只在本机local下执行的,不是集群模式,把全部代码贴一下吧。
    python版:
    # 建立环境(udaf仅支持批环境)
    env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
    env = BatchTableEnvironment.create(environment_settings=env_settings)
    # 表1 1千万行  
     source_ddl1 = """CREATE TABLE TP_GL_DAY (DAY_ID VARCHAR(8),IS_EXCH_DAY DECIMAL
                        ) WITH (
                        'connector' = 'jdbc',
                        'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
                        'driver' = 'com.mysql.cj.jdbc.Driver',
                        'username' = 'root',
                        'password' = 'xxx',
                        'table-name' = 'TP_GL_DAY')
                """
    #表2 700多行
    source_ddl2 = """CREATE TABLE TS_PF_SEC_YLDRATE (PF_ID VARCHAR(10),\
                    SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),\
                    CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12)
                    ) WITH (
                    'connector' = 'jdbc',
                    'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
                    'driver' = 'com.mysql.cj.jdbc.Driver',
                    'username' = 'root',
                    'password' = 'xxx',
                    'table-name' = 'TS_PF_SEC_YLDRATE')
            """
   # sink
     print_sink_ddl = """
                      CREATE TABLE print(
                        pf_id VARCHAR(10),
                        out_put FLOAT
                    ) WITH (
                      'connector' = 'print'
                    )
                """
    # 源表
    env.execute_sql(source_ddl1)
    env.execute_sql(source_ddl2)
   # sink
     env.execute_sql(print_sink_ddl)

    sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID = '456' AND BIZ_DATE BETWEEN '20160701' AND '20170307'"

    # 获取Query结果
    query_table = env.sql_query(sql)
    # 执行udaf
    # udaf 聚合函数计算
    @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
    def logReturn(i, j):
        df = pd.DataFrame({'pf_id': i, 'yldrate': j})
        df['yldrate1'] = df['yldrate'] + 1
        return np.prod(df['yldrate1']) - 1
    # 执行并打印
    result = query_table.group_by(query_table.PF_ID).select(query_table.PF_ID,
                                                    logReturn(
                                                        query_table.PF_ID,
                                                        query_table.YLDRATE)).execute_insert('print').wait()

    Java版本:
    Java选用的环境是流环境:
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings streamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
        streamEnv.execute("");
    计算部分:
        java这边的queryData也是通过定义connector DDL注册源表后,执行sql获取的。
        tableEnv.registerFunction("add", new addFunction());
        tableEnv.registerFunction("prod", new ProductUdaf());
        Table addedTable = tableEnv.sqlQuery("SELECT pf_id,add(yldrate) as yldrate FROM queryData");
        tableEnv.createTemporaryView("addedTable", addedTable);
        Table resultTable = tableEnv.sqlQuery("SELECT pf_id,prod(yldrate)-1 as yldrate FROM addedTable group by pf_id");

因为java版本代码,是同事写的,但逻辑按照python这边的逻辑,执行时间上python看本机的cpu占用情况(每次执行时不超过8%)会跑400或500s不等,基本维持在400s左右;我的电脑是win10 64位,RAM16GB,主频2.3GHz, 内核4,逻辑处理器8.


[hidden email]
 
发件人: Xingbo Huang
发送时间: 2021-03-02 11:59
收件人: user-zh
主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi,
 
首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。
然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟?
 
Best,
Xingbo
 
[hidden email] <[hidden email]> 于2021年3月2日周二 上午9:57写道:
 

> Hi,
>     我是用的flink1.12的pandas类型的udaf, 代码如下:
>     @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
>     def logReturn(i, j):
>         df = pd.DataFrame({'id': i, 'rate': j})
>         df['rate1'] = df['rate'] + 1
>         return numpy.prod(df['rate1']) - 1
>     调用方式为:
>      result =
> query_table.group_by(query_table.PF_ID).select(query_table.ID,
>                                                      logReturn(
>                                                          query_table.ID,
>
>  query_table.RATE)).execute_insert('print').wait()
> 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
> java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
> 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
> 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
> 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~
>
>
>
> [hidden email]
>
> 发件人: Xingbo Huang
> 发送时间: 2021-03-02 09:42
> 收件人: user-zh
> 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> Hi,
>
> 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
>
> udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
>
> Best,
> Xingbo
>
> xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道:
>
> > Hi, Xingbo
> >     非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> >     项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> >     也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
> >
> >     所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or
> numpy中的矩阵计算,非常感谢~!
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道:
> > >Hi,
> > >
> >
> >
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> > >
> > >Best
> > >Xingbo
> > >
> > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道:
> > >
> > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> > >>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> > source1.ID
> > >> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> > >> '20170307'"
> > >> # 获取Query结果
> > >>     query_table = env.sql_query(sql)
> > >>     query_table.to_pandas()
> > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> > >>
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

Xingbo Huang
Hi,

不好意思回复这么晚。关于pandas
udaf,我有专门测试过框架层的开销(函数用普通的均值计算)。和java相比,差距也就3,4倍左右,具体可以参考代码[1]。关于你这个代码,我怀疑是因为你函数实现的问题。你这个函数构造df是会有额外的开销。你为啥不直接使用j来进行计算。当然了,你也可以根据调整一些参数来提高性能,比如python.fn-execution.bundle.size和python.fn-execution.bundle.time,具体可以参考文档[2]。



[1]
https://github.com/HuangXingBo/pyflink-performance-demo/blob/master/python/flink/flink-pandas-udaf-test.py
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-fn-execution-bundle-size

Best,
Xingbo

[hidden email] <[hidden email]> 于2021年3月2日周二 下午1:38写道:

> Hi,
>     是的,就是在batch模式下,我是只在本机local下执行的,不是集群模式,把全部代码贴一下吧。
>     python版:
>     # 建立环境(udaf仅支持批环境)
>     env_settings =
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
>     env = BatchTableEnvironment.create(environment_settings=env_settings)
>     # 表1 1千万行
>      source_ddl1 = """CREATE TABLE TP_GL_DAY (DAY_ID
> VARCHAR(8),IS_EXCH_DAY DECIMAL
>                         ) WITH (
>                         'connector' = 'jdbc',
>                         'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
>                         'driver' = 'com.mysql.cj.jdbc.Driver',
>                         'username' = 'root',
>                         'password' = 'xxx',
>                         'table-name' = 'TP_GL_DAY')
>                 """
>     #表2 700多行
>     source_ddl2 = """CREATE TABLE TS_PF_SEC_YLDRATE (PF_ID VARCHAR(10),\
>                     SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),\
>                     CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12)
>                     ) WITH (
>                     'connector' = 'jdbc',
>                     'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
>                     'driver' = 'com.mysql.cj.jdbc.Driver',
>                     'username' = 'root',
>                     'password' = 'xxx',
>                     'table-name' = 'TS_PF_SEC_YLDRATE')
>             """
>    # sink
>      print_sink_ddl = """
>                       CREATE TABLE print(
>                         pf_id VARCHAR(10),
>                         out_put FLOAT
>                     ) WITH (
>                       'connector' = 'print'
>                     )
>                 """
>     # 源表
>     env.execute_sql(source_ddl1)
>     env.execute_sql(source_ddl2)
>    # sink
>      env.execute_sql(print_sink_ddl)
>
>     sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN
> TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID =
> '456' AND BIZ_DATE BETWEEN '20160701' AND '20170307'"
>
>     # 获取Query结果
>     query_table = env.sql_query(sql)
>     # 执行udaf
>     # udaf 聚合函数计算
>     @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
>     def logReturn(i, j):
>         df = pd.DataFrame({'pf_id': i, 'yldrate': j})
>         df['yldrate1'] = df['yldrate'] + 1
>         return np.prod(df['yldrate1']) - 1
>     # 执行并打印
>     result =
> query_table.group_by(query_table.PF_ID).select(query_table.PF_ID,
>                                                     logReturn(
>                                                         query_table.PF_ID,
>
> query_table.YLDRATE)).execute_insert('print').wait()
>
>     Java版本:
>     Java选用的环境是流环境:
>         StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings streamSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, streamSettings);
>         streamEnv.execute("");
>     计算部分:
>         java这边的queryData也是通过定义connector DDL注册源表后,执行sql获取的。
>         tableEnv.registerFunction("add", new addFunction());
>         tableEnv.registerFunction("prod", new ProductUdaf());
>         Table addedTable = tableEnv.sqlQuery("SELECT pf_id,add(yldrate) as
> yldrate FROM queryData");
>         tableEnv.createTemporaryView("addedTable", addedTable);
>         Table resultTable = tableEnv.sqlQuery("SELECT
> pf_id,prod(yldrate)-1 as yldrate FROM addedTable group by pf_id");
>
> 因为java版本代码,是同事写的,但逻辑按照python这边的逻辑,执行时间上python看本机的cpu占用情况(每次执行时不超过8%)会跑400或500s不等,基本维持在400s左右;我的电脑是win10
> 64位,RAM16GB,主频2.3GHz, 内核4,逻辑处理器8.
>
>
> [hidden email]
>
> 发件人: Xingbo Huang
> 发送时间: 2021-03-02 11:59
> 收件人: user-zh
> 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> Hi,
>
> 首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。
>
> 然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟?
>
> Best,
> Xingbo
>
> [hidden email] <[hidden email]> 于2021年3月2日周二 上午9:57写道:
>
> > Hi,
> >     我是用的flink1.12的pandas类型的udaf, 代码如下:
> >     @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> >     def logReturn(i, j):
> >         df = pd.DataFrame({'id': i, 'rate': j})
> >         df['rate1'] = df['rate'] + 1
> >         return numpy.prod(df['rate1']) - 1
> >     调用方式为:
> >      result =
> > query_table.group_by(query_table.PF_ID).select(query_table.ID,
> >                                                      logReturn(
> >                                                          query_table.ID,
> >
> >  query_table.RATE)).execute_insert('print').wait()
> > 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
> > java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
> > 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
> > 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
> > 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~
> >
> >
> >
> > [hidden email]
> >
> > 发件人: Xingbo Huang
> > 发送时间: 2021-03-02 09:42
> > 收件人: user-zh
> > 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> > Hi,
> >
> > 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
> >
> >
> udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
> >
> > Best,
> > Xingbo
> >
> > xiaoyue <[hidden email]> 于2021年3月1日周一 上午10:34写道:
> >
> > > Hi, Xingbo
> > >     非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> > >     项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> > >     也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
> > >
> > >     所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or
> > numpy中的矩阵计算,非常感谢~!
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2021-03-01 09:54:49,"Xingbo Huang" <[hidden email]> 写道:
> > > >Hi,
> > > >
> > >
> > >
> >
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> > > >
> > > >Best
> > > >Xingbo
> > > >
> > > >xiaoyue <[hidden email]> 于2021年2月26日周五 下午12:38写道:
> > > >
> > > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> > > >>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> > > source1.ID
> > > >> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> > > >> '20170307'"
> > > >> # 获取Query结果
> > > >>     query_table = env.sql_query(sql)
> > > >>     query_table.to_pandas()
> > > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> > > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> > > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> > > >>
> > > >>
> > >
> >
>