Posted by
Lucas on
Dec 14, 2020; 2:59am
URL: http://apache-flink.370.s1.nabble.com/Pandas-UDF-sink-tp9371p9379.html
多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
但现在有另一个问题,根据文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
Vectorized Python aggregate functions takes one or more pandas.Series as the inputs and return one scalar value as output.
Note The return type does not support RowType and MapType for the time being.
udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
现在是后面用另一个udf把这个string再做拆分,代码大概如下:
@udf(result_type=DataTypes.ROW(
[DataTypes.FIELD('value1', DataTypes.BIGINT()),
DataTypes.FIELD('value2', DataTypes.INT())]))
def flattenStr(inputStr):
ret_array = [int(x) for x in inputStr.split(',')]
return Row(ret_array[0], ret_array[1])
t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table = order_table.window(tumble_window) \
.group_by("w") \
.select("**调用udaf** as aggValue")
result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
[hidden email]
发件人: Wei Zhong
发送时间: 2020-12-14 10:38
收件人: user-zh
主题: Re: Pandas UDF处理过的数据sink问题
Hi Lucas,
是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
你可以尝试将sql语句改成以下形式:
select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
Best,
Wei
> 在 2020年12月13日,13:13,Lucas <
[hidden email]> 写道:
>
> 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> result_type=DataTypes.ROW(
> [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> DataTypes.FIELD('aveBuy', DataTypes.INT())),
> func_type='pandas')
> def orderCalc(code, amount):
>
> df = pd.DataFrame({'code': code, 'amount': amount})
> # pandas 数据处理后输入另一个dataframe output
> return (output['buyQtl'], output['aveBuy'])
>
>
> 定义了csv的sink如下
>
> create table csvSink (
> buyQtl BIGINT,
> aveBuy INT
> ) with (
> 'connector.type' = 'filesystem',
> 'format.type' = 'csv',
> 'connector.path' = 'e:/output'
> )
>
>
>
> 然后进行如下的操作:
>
> result_table = t_env.sql_query("""
> select orderCalc(code, amount)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> """)
> result_table.execute_insert("csvSink")
>
>
>
> 在执行程序的时候提示没法入库
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o98.executeInsert.
>
> : org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table
> 'default_catalog.default_database.csvSink' do not match.
>
> Cause: Different number of columns.
>
>
>
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
>
> Sink schema: [buyQtl: BIGINT, aveBuy: INT]
>
> at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> ception(DynamicSinkUtils.java:304)
>
> at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> ImplicitCast(DynamicSinkUtils.java:134)
>
>
>
> 是UDF的输出结构不对吗,还是需要调整sink table的结构?
>