Login  Register

Re: Pandas UDF处理过的数据sink问题

Posted by Wei Zhong on Dec 14, 2020; 2:38am
URL: http://apache-flink.370.s1.nabble.com/Pandas-UDF-sink-tp9371p9377.html

Hi Lucas,

是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。

你可以尝试将sql语句改成以下形式:

select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount

此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”

Best,
Wei

> 在 2020年12月13日,13:13,Lucas <[hidden email]> 写道:
>
> 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
>     result_type=DataTypes.ROW(
>         [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
>          DataTypes.FIELD('aveBuy', DataTypes.INT())),
>     func_type='pandas')
> def orderCalc(code, amount):
>
>    df = pd.DataFrame({'code': code, 'amount': amount})
> # pandas 数据处理后输入另一个dataframe output
> return (output['buyQtl'], output['aveBuy'])
>
>
> 定义了csv的sink如下
>
> create table csvSink (
>    buyQtl BIGINT,
>    aveBuy INT
> ) with (
>    'connector.type' = 'filesystem',
>    'format.type' = 'csv',
>    'connector.path' = 'e:/output'
> )
>
>
>
> 然后进行如下的操作:
>
> result_table = t_env.sql_query("""
> select orderCalc(code, amount)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> """)
> result_table.execute_insert("csvSink")
>
>
>
> 在执行程序的时候提示没法入库
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o98.executeInsert.
>
> : org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table
> 'default_catalog.default_database.csvSink' do not match.
>
> Cause: Different number of columns.
>
>
>
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
>
> Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
>
>        at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> ception(DynamicSinkUtils.java:304)
>
>        at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> ImplicitCast(DynamicSinkUtils.java:134)
>
>
>
> 是UDF的输出结构不对吗,还是需要调整sink table的结构?
>