Pandas UDF处理过的数据sink问题
Posted by Lucas on Dec 13, 2020; 5:13am
URL: http://apache-flink.370.s1.nabble.com/Pandas-UDF-sink-tp9371.html
使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
result_type=DataTypes.ROW(
[DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
DataTypes.FIELD('aveBuy', DataTypes.INT())),
func_type='pandas')
def orderCalc(code, amount):
df = pd.DataFrame({'code': code, 'amount': amount})
# pandas 数据处理后输入另一个dataframe output
return (output['buyQtl'], output['aveBuy'])
定义了csv的sink如下
create table csvSink (
buyQtl BIGINT,
aveBuy INT
) with (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = 'e:/output'
)
然后进行如下的操作:
result_table = t_env.sql_query("""
select orderCalc(code, amount)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
""")
result_table.execute_insert("csvSink")
在执行程序的时候提示没法入库
py4j.protocol.Py4JJavaError: An error occurred while calling
o98.executeInsert.
: org.apache.flink.table.api.ValidationException: Column types of query
result and sink for registered table
'default_catalog.default_database.csvSink' do not match.
Cause: Different number of columns.
Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
Sink schema: [buyQtl: BIGINT, aveBuy: INT]
at
org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
ception(DynamicSinkUtils.java:304)
at
org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
ImplicitCast(DynamicSinkUtils.java:134)
是UDF的输出结构不对吗,还是需要调整sink table的结构?