Pandas UDF处理过的数据sink问题

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Pandas UDF处理过的数据sink问题

Lucas
使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下

@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
     result_type=DataTypes.ROW(
         [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
          DataTypes.FIELD('aveBuy', DataTypes.INT())),
     func_type='pandas')
def orderCalc(code, amount):

    df = pd.DataFrame({'code': code, 'amount': amount})
# pandas 数据处理后输入另一个dataframe output
return (output['buyQtl'], output['aveBuy'])
 

定义了csv的sink如下

create table csvSink (
    buyQtl BIGINT,
    aveBuy INT
) with (
    'connector.type' = 'filesystem',
    'format.type' = 'csv',
    'connector.path' = 'e:/output'
)

 

然后进行如下的操作:

result_table = t_env.sql_query("""
select orderCalc(code, amount)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
""")
result_table.execute_insert("csvSink")

 

在执行程序的时候提示没法入库

py4j.protocol.Py4JJavaError: An error occurred while calling
o98.executeInsert.

: org.apache.flink.table.api.ValidationException: Column types of query
result and sink for registered table
'default_catalog.default_database.csvSink' do not match.

Cause: Different number of columns.

 

Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]

Sink schema:  [buyQtl: BIGINT, aveBuy: INT]

        at
org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
ception(DynamicSinkUtils.java:304)

        at
org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
ImplicitCast(DynamicSinkUtils.java:134)

 

是UDF的输出结构不对吗,还是需要调整sink table的结构?

Reply | Threaded
Open this post in threaded view
|

Re: Pandas UDF处理过的数据sink问题

Wei Zhong
Hi Lucas,

是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。

你可以尝试将sql语句改成以下形式:

select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount

此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”

Best,
Wei

> 在 2020年12月13日,13:13,Lucas <[hidden email]> 写道:
>
> 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
>     result_type=DataTypes.ROW(
>         [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
>          DataTypes.FIELD('aveBuy', DataTypes.INT())),
>     func_type='pandas')
> def orderCalc(code, amount):
>
>    df = pd.DataFrame({'code': code, 'amount': amount})
> # pandas 数据处理后输入另一个dataframe output
> return (output['buyQtl'], output['aveBuy'])
>
>
> 定义了csv的sink如下
>
> create table csvSink (
>    buyQtl BIGINT,
>    aveBuy INT
> ) with (
>    'connector.type' = 'filesystem',
>    'format.type' = 'csv',
>    'connector.path' = 'e:/output'
> )
>
>
>
> 然后进行如下的操作:
>
> result_table = t_env.sql_query("""
> select orderCalc(code, amount)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> """)
> result_table.execute_insert("csvSink")
>
>
>
> 在执行程序的时候提示没法入库
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o98.executeInsert.
>
> : org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table
> 'default_catalog.default_database.csvSink' do not match.
>
> Cause: Different number of columns.
>
>
>
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
>
> Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
>
>        at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> ception(DynamicSinkUtils.java:304)
>
>        at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> ImplicitCast(DynamicSinkUtils.java:134)
>
>
>
> 是UDF的输出结构不对吗,还是需要调整sink table的结构?
>

Reply | Threaded
Open this post in threaded view
|

回复: Re: Pandas UDF处理过的数据sink问题

Lucas
多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
但现在有另一个问题,根据文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions 
Vectorized Python aggregate functions takes one or more pandas.Series as the inputs and return one scalar value as output.
Note The return type does not support RowType and MapType for the time being.
udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
现在是后面用另一个udf把这个string再做拆分,代码大概如下:
@udf(result_type=DataTypes.ROW(
    [DataTypes.FIELD('value1', DataTypes.BIGINT()),
     DataTypes.FIELD('value2', DataTypes.INT())]))
def flattenStr(inputStr):
    ret_array = [int(x) for x in inputStr.split(',')]
    return Row(ret_array[0], ret_array[1])
t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table = order_table.window(tumble_window) \
    .group_by("w") \
    .select("**调用udaf** as aggValue")
result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
    at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)


[hidden email]
 
发件人: Wei Zhong
发送时间: 2020-12-14 10:38
收件人: user-zh
主题: Re: Pandas UDF处理过的数据sink问题
Hi Lucas,
 
是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
 
你可以尝试将sql语句改成以下形式:
 
select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
 
此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
 
Best,
Wei
 

> 在 2020年12月13日,13:13,Lucas <[hidden email]> 写道:
>
> 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
>     result_type=DataTypes.ROW(
>         [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
>          DataTypes.FIELD('aveBuy', DataTypes.INT())),
>     func_type='pandas')
> def orderCalc(code, amount):
>
>    df = pd.DataFrame({'code': code, 'amount': amount})
> # pandas 数据处理后输入另一个dataframe output
> return (output['buyQtl'], output['aveBuy'])
>
>
> 定义了csv的sink如下
>
> create table csvSink (
>    buyQtl BIGINT,
>    aveBuy INT
> ) with (
>    'connector.type' = 'filesystem',
>    'format.type' = 'csv',
>    'connector.path' = 'e:/output'
> )
>
>
>
> 然后进行如下的操作:
>
> result_table = t_env.sql_query("""
> select orderCalc(code, amount)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> """)
> result_table.execute_insert("csvSink")
>
>
>
> 在执行程序的时候提示没法入库
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o98.executeInsert.
>
> : org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table
> 'default_catalog.default_database.csvSink' do not match.
>
> Cause: Different number of columns.
>
>
>
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
>
> Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
>
>        at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> ception(DynamicSinkUtils.java:304)
>
>        at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> ImplicitCast(DynamicSinkUtils.java:134)
>
>
>
> 是UDF的输出结构不对吗,还是需要调整sink table的结构?
>
 
 
Reply | Threaded
Open this post in threaded view
|

Re: Re: Pandas UDF处理过的数据sink问题

Xingbo Huang
In reply to this post by Wei Zhong
Hi,
你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
Best,
Xingbo

[hidden email] <[hidden email]> 于2020年12月14日周一 上午11:00写道:

> 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
> 但现在有另一个问题,根据文档
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
> Vectorized Python aggregate functions takes one or more pandas.Series as
> the inputs and return one scalar value as output.
> Note The return type does not support RowType and MapType for the time
> being.
> udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
> 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
> @udf(result_type=DataTypes.ROW(
>     [DataTypes.FIELD('value1', DataTypes.BIGINT()),
>      DataTypes.FIELD('value2', DataTypes.INT())]))
> def flattenStr(inputStr):
>     ret_array = [int(x) for x in inputStr.split(',')]
>     return Row(ret_array[0], ret_array[1])
> t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table =
> order_table.window(tumble_window) \
>     .group_by("w") \
>     .select("**调用udaf** as aggValue")
> result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
>
> result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
>     at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
>     at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
>     at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
>     at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
>
>
> [hidden email]
>
> 发件人: Wei Zhong
> 发送时间: 2020-12-14 10:38
> 收件人: user-zh
> 主题: Re: Pandas UDF处理过的数据sink问题
> Hi Lucas,
>
> 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
>
> 你可以尝试将sql语句改成以下形式:
>
> select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
>
> 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
>
> Best,
> Wei
>
> > 在 2020年12月13日,13:13,Lucas <[hidden email]> 写道:
> >
> > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> >
> > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> >     result_type=DataTypes.ROW(
> >         [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> >          DataTypes.FIELD('aveBuy', DataTypes.INT())),
> >     func_type='pandas')
> > def orderCalc(code, amount):
> >
> >    df = pd.DataFrame({'code': code, 'amount': amount})
> > # pandas 数据处理后输入另一个dataframe output
> > return (output['buyQtl'], output['aveBuy'])
> >
> >
> > 定义了csv的sink如下
> >
> > create table csvSink (
> >    buyQtl BIGINT,
> >    aveBuy INT
> > ) with (
> >    'connector.type' = 'filesystem',
> >    'format.type' = 'csv',
> >    'connector.path' = 'e:/output'
> > )
> >
> >
> >
> > 然后进行如下的操作:
> >
> > result_table = t_env.sql_query("""
> > select orderCalc(code, amount)
> > from `some_source`
> > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> > """)
> > result_table.execute_insert("csvSink")
> >
> >
> >
> > 在执行程序的时候提示没法入库
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o98.executeInsert.
> >
> > : org.apache.flink.table.api.ValidationException: Column types of query
> > result and sink for registered table
> > 'default_catalog.default_database.csvSink' do not match.
> >
> > Cause: Different number of columns.
> >
> >
> >
> > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> >
> > Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
> >
> >        at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> > ception(DynamicSinkUtils.java:304)
> >
> >        at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> > ImplicitCast(DynamicSinkUtils.java:134)
> >
> >
> >
> > 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> >
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Pandas UDF处理过的数据sink问题

Lucas
Hi xingbo,
文档中给的例子udtf需要和join一起使用,但是我现在不需要join,只是单纯的转换结果
如果直接调用了udtf后sink,会提示
Cause: Different number of columns.
Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]

Sink schema:  [buyQtl: BIGINT, aveBuy: INT]


[hidden email]
 
发件人: Xingbo Huang
发送时间: 2020-12-14 11:38
收件人: user-zh
主题: Re: Re: Pandas UDF处理过的数据sink问题
Hi,
你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1]
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
Best,
Xingbo
 
[hidden email] <[hidden email]> 于2020年12月14日周一 上午11:00写道:
 

> 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
> 但现在有另一个问题,根据文档
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
> Vectorized Python aggregate functions takes one or more pandas.Series as
> the inputs and return one scalar value as output.
> Note The return type does not support RowType and MapType for the time
> being.
> udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
> 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
> @udf(result_type=DataTypes.ROW(
>     [DataTypes.FIELD('value1', DataTypes.BIGINT()),
>      DataTypes.FIELD('value2', DataTypes.INT())]))
> def flattenStr(inputStr):
>     ret_array = [int(x) for x in inputStr.split(',')]
>     return Row(ret_array[0], ret_array[1])
> t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table =
> order_table.window(tumble_window) \
>     .group_by("w") \
>     .select("**调用udaf** as aggValue")
> result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
>
> result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
>     at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
>     at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
>     at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
>     at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
>
>
> [hidden email]
>
> 发件人: Wei Zhong
> 发送时间: 2020-12-14 10:38
> 收件人: user-zh
> 主题: Re: Pandas UDF处理过的数据sink问题
> Hi Lucas,
>
> 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
>
> 你可以尝试将sql语句改成以下形式:
>
> select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
>
> 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
>
> Best,
> Wei
>
> > 在 2020年12月13日,13:13,Lucas <[hidden email]> 写道:
> >
> > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> >
> > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> >     result_type=DataTypes.ROW(
> >         [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> >          DataTypes.FIELD('aveBuy', DataTypes.INT())),
> >     func_type='pandas')
> > def orderCalc(code, amount):
> >
> >    df = pd.DataFrame({'code': code, 'amount': amount})
> > # pandas 数据处理后输入另一个dataframe output
> > return (output['buyQtl'], output['aveBuy'])
> >
> >
> > 定义了csv的sink如下
> >
> > create table csvSink (
> >    buyQtl BIGINT,
> >    aveBuy INT
> > ) with (
> >    'connector.type' = 'filesystem',
> >    'format.type' = 'csv',
> >    'connector.path' = 'e:/output'
> > )
> >
> >
> >
> > 然后进行如下的操作:
> >
> > result_table = t_env.sql_query("""
> > select orderCalc(code, amount)
> > from `some_source`
> > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> > """)
> > result_table.execute_insert("csvSink")
> >
> >
> >
> > 在执行程序的时候提示没法入库
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o98.executeInsert.
> >
> > : org.apache.flink.table.api.ValidationException: Column types of query
> > result and sink for registered table
> > 'default_catalog.default_database.csvSink' do not match.
> >
> > Cause: Different number of columns.
> >
> >
> >
> > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> >
> > Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
> >
> >        at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> > ception(DynamicSinkUtils.java:304)
> >
> >        at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> > ImplicitCast(DynamicSinkUtils.java:134)
> >
> >
> >
> > 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> >
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Pandas UDF处理过的数据sink问题

Xingbo Huang
In reply to this post by Xingbo Huang
Hi,

join
udtf和你认为的两个table的join是不一样的,只是因为udtf会返回多条结果,需要左边的一条输入和多条的udtf输出拼接在一起,所以用join。对于udf只会返回一条输出,所以是一对一的拼接。如果你udtf只返回一条结果,这个拼接和udf就是类似的。udtf是能直接扩展列的,而udf,
udaf都没法直接扩展列的,除非你能使用row-based的那套operation[1],不过这个feature在1.13
PyFlink才会支持[2]。

当然了,你可以按照weizhong的方式,一个udaf,直接返回一个Row类型的数据,然后再去get(0),get(1)的方式去拿也可以,不过在1.12只有普通的Python
UDAF是支持返回一个Row类型的,Pandas
UDAF没法支持你返回一个Row类型的结果,不过这个feature在master(1.13)上已经支持了[3]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations
[2] https://issues.apache.org/jira/browse/FLINK-20479
[3] https://issues.apache.org/jira/browse/FLINK-20507

Best,
Xingbo


Best,
Xingbo

[hidden email] <[hidden email]> 于2020年12月14日周一 下午2:29写道:

> Hi xingbo,
> 文档中给的例子udtf需要和join一起使用,但是我现在不需要join,只是单纯的转换结果
> 如果直接调用了udtf后sink,会提示
> Cause: Different number of columns.
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
>
> Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
>
>
> [hidden email]
>
> 发件人: Xingbo Huang
> 发送时间: 2020-12-14 11:38
> 收件人: user-zh
> 主题: Re: Re: Pandas UDF处理过的数据sink问题
> Hi,
> 你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
> Best,
> Xingbo
>
> [hidden email] <[hidden email]> 于2020年12月14日周一
> 上午11:00写道:
>
> > 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
> > 但现在有另一个问题,根据文档
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
> > Vectorized Python aggregate functions takes one or more pandas.Series as
> > the inputs and return one scalar value as output.
> > Note The return type does not support RowType and MapType for the time
> > being.
> > udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
> > 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
> > @udf(result_type=DataTypes.ROW(
> >     [DataTypes.FIELD('value1', DataTypes.BIGINT()),
> >      DataTypes.FIELD('value2', DataTypes.INT())]))
> > def flattenStr(inputStr):
> >     ret_array = [int(x) for x in inputStr.split(',')]
> >     return Row(ret_array[0], ret_array[1])
> > t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table
> =
> > order_table.window(tumble_window) \
> >     .group_by("w") \
> >     .select("**调用udaf** as aggValue")
> > result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
> >
> >
> result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
> >     at
> >
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
> >     at
> >
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
> >     at
> >
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
> >     at
> >
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
> >
> >
> > [hidden email]
> >
> > 发件人: Wei Zhong
> > 发送时间: 2020-12-14 10:38
> > 收件人: user-zh
> > 主题: Re: Pandas UDF处理过的数据sink问题
> > Hi Lucas,
> >
> > 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
> >
> > 你可以尝试将sql语句改成以下形式:
> >
> > select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
> > from `some_source`
> > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> >
> > 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
> >
> > Best,
> > Wei
> >
> > > 在 2020年12月13日,13:13,Lucas <[hidden email]> 写道:
> > >
> > > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> > >
> > > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> > >     result_type=DataTypes.ROW(
> > >         [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> > >          DataTypes.FIELD('aveBuy', DataTypes.INT())),
> > >     func_type='pandas')
> > > def orderCalc(code, amount):
> > >
> > >    df = pd.DataFrame({'code': code, 'amount': amount})
> > > # pandas 数据处理后输入另一个dataframe output
> > > return (output['buyQtl'], output['aveBuy'])
> > >
> > >
> > > 定义了csv的sink如下
> > >
> > > create table csvSink (
> > >    buyQtl BIGINT,
> > >    aveBuy INT
> > > ) with (
> > >    'connector.type' = 'filesystem',
> > >    'format.type' = 'csv',
> > >    'connector.path' = 'e:/output'
> > > )
> > >
> > >
> > >
> > > 然后进行如下的操作:
> > >
> > > result_table = t_env.sql_query("""
> > > select orderCalc(code, amount)
> > > from `some_source`
> > > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> > > """)
> > > result_table.execute_insert("csvSink")
> > >
> > >
> > >
> > > 在执行程序的时候提示没法入库
> > >
> > > py4j.protocol.Py4JJavaError: An error occurred while calling
> > > o98.executeInsert.
> > >
> > > : org.apache.flink.table.api.ValidationException: Column types of query
> > > result and sink for registered table
> > > 'default_catalog.default_database.csvSink' do not match.
> > >
> > > Cause: Different number of columns.
> > >
> > >
> > >
> > > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> > >
> > > Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
> > >
> > >        at
> > >
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> > > ception(DynamicSinkUtils.java:304)
> > >
> > >        at
> > >
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> > > ImplicitCast(DynamicSinkUtils.java:134)
> > >
> > >
> > >
> > > 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> > >
> >
> >
> >
>