> 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
> 但现在有另一个问题,根据文档
>
>
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions> Vectorized Python aggregate functions takes one or more pandas.Series as
> the inputs and return one scalar value as output.
> Note The return type does not support RowType and MapType for the time
> being.
> udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
> 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
> @udf(result_type=DataTypes.ROW(
> [DataTypes.FIELD('value1', DataTypes.BIGINT()),
> DataTypes.FIELD('value2', DataTypes.INT())]))
> def flattenStr(inputStr):
> ret_array = [int(x) for x in inputStr.split(',')]
> return Row(ret_array[0], ret_array[1])
> t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table =
> order_table.window(tumble_window) \
> .group_by("w") \
> .select("**调用udaf** as aggValue")
> result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
>
> result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
> at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
> at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
>
>
>
[hidden email]
>
> 发件人: Wei Zhong
> 发送时间: 2020-12-14 10:38
> 收件人: user-zh
> 主题: Re: Pandas UDF处理过的数据sink问题
> Hi Lucas,
>
> 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
>
> 你可以尝试将sql语句改成以下形式:
>
> select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
>
> 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
>
> Best,
> Wei
>
> > 在 2020年12月13日,13:13,Lucas <
[hidden email]> 写道:
> >
> > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> >
> > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> > result_type=DataTypes.ROW(
> > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> > DataTypes.FIELD('aveBuy', DataTypes.INT())),
> > func_type='pandas')
> > def orderCalc(code, amount):
> >
> > df = pd.DataFrame({'code': code, 'amount': amount})
> > # pandas 数据处理后输入另一个dataframe output
> > return (output['buyQtl'], output['aveBuy'])
> >
> >
> > 定义了csv的sink如下
> >
> > create table csvSink (
> > buyQtl BIGINT,
> > aveBuy INT
> > ) with (
> > 'connector.type' = 'filesystem',
> > 'format.type' = 'csv',
> > 'connector.path' = 'e:/output'
> > )
> >
> >
> >
> > 然后进行如下的操作:
> >
> > result_table = t_env.sql_query("""
> > select orderCalc(code, amount)
> > from `some_source`
> > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> > """)
> > result_table.execute_insert("csvSink")
> >
> >
> >
> > 在执行程序的时候提示没法入库
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o98.executeInsert.
> >
> > : org.apache.flink.table.api.ValidationException: Column types of query
> > result and sink for registered table
> > 'default_catalog.default_database.csvSink' do not match.
> >
> > Cause: Different number of columns.
> >
> >
> >
> > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> >
> > Sink schema: [buyQtl: BIGINT, aveBuy: INT]
> >
> > at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> > ception(DynamicSinkUtils.java:304)
> >
> > at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> > ImplicitCast(DynamicSinkUtils.java:134)
> >
> >
> >
> > 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> >
>
>
>