回复:Pandas UDF处理过的数据sink问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

回复:Pandas UDF处理过的数据sink问题

Lucas
多谢。现在用逗号拼合字符串推mq,剩下交给后续程序自行处理了



---原始邮件---
发件人: "Xingbo Huang"<[hidden email]&gt;
发送时间: 2020年12月14日(周一) 晚上7:46
收件人: "user-zh"<[hidden email]&gt;;
主题: Re: Re: Pandas UDF处理过的数据sink问题


Hi,

join
udtf和你认为的两个table的join是不一样的,只是因为udtf会返回多条结果,需要左边的一条输入和多条的udtf输出拼接在一起,所以用join。对于udf只会返回一条输出,所以是一对一的拼接。如果你udtf只返回一条结果,这个拼接和udf就是类似的。udtf是能直接扩展列的,而udf,
udaf都没法直接扩展列的,除非你能使用row-based的那套operation[1],不过这个feature在1.13
PyFlink才会支持[2]。

当然了,你可以按照weizhong的方式,一个udaf,直接返回一个Row类型的数据,然后再去get(0),get(1)的方式去拿也可以,不过在1.12只有普通的Python
UDAF是支持返回一个Row类型的,Pandas
UDAF没法支持你返回一个Row类型的结果,不过这个feature在master(1.13)上已经支持了[3]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations
[2] https://issues.apache.org/jira/browse/FLINK-20479
[3] https://issues.apache.org/jira/browse/FLINK-20507

Best,
Xingbo


Best,
Xingbo

[hidden email] <[hidden email]&gt; 于2020年12月14日周一 下午2:29写道:

&gt; Hi xingbo,
&gt; 文档中给的例子udtf需要和join一起使用,但是我现在不需要join,只是单纯的转换结果
&gt; 如果直接调用了udtf后sink,会提示
&gt; Cause: Different number of columns.
&gt; Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT &gt;]
&gt;
&gt; Sink schema:&nbsp; [buyQtl: BIGINT, aveBuy: INT]
&gt;
&gt;
&gt; [hidden email]
&gt;
&gt; 发件人: Xingbo Huang
&gt; 发送时间: 2020-12-14 11:38
&gt; 收件人: user-zh
&gt; 主题: Re: Re: Pandas UDF处理过的数据sink问题
&gt; Hi,
&gt; 你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1]
&gt;
&gt; [1]
&gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
&gt; Best,
&gt; Xingbo
&gt;
&gt; [hidden email] <[hidden email]&gt; 于2020年12月14日周一
&gt; 上午11:00写道:
&gt;
&gt; &gt; 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
&gt; &gt; 但现在有另一个问题,根据文档
&gt; &gt;
&gt; &gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
&gt; &gt; Vectorized Python aggregate functions takes one or more pandas.Series as
&gt; &gt; the inputs and return one scalar value as output.
&gt; &gt; Note The return type does not support RowType and MapType for the time
&gt; &gt; being.
&gt; &gt; udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
&gt; &gt; 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
&gt; &gt; @udf(result_type=DataTypes.ROW(
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; [DataTypes.FIELD('value1', DataTypes.BIGINT()),
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataTypes.FIELD('value2', DataTypes.INT())]))
&gt; &gt; def flattenStr(inputStr):
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; ret_array = [int(x) for x in inputStr.split(',')]
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; return Row(ret_array[0], ret_array[1])
&gt; &gt; t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table
&gt; =
&gt; &gt; order_table.window(tumble_window) \
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; .group_by("w") \
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; .select("**调用udaf** as aggValue")
&gt; &gt; result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
&gt; &gt;
&gt; &gt;
&gt; result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
&gt; &gt; Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
&gt; &gt;
&gt; &gt;
&gt; &gt; [hidden email]
&gt; &gt;
&gt; &gt; 发件人: Wei Zhong
&gt; &gt; 发送时间: 2020-12-14 10:38
&gt; &gt; 收件人: user-zh
&gt; &gt; 主题: Re: Pandas UDF处理过的数据sink问题
&gt; &gt; Hi Lucas,
&gt; &gt;
&gt; &gt; 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
&gt; &gt;
&gt; &gt; 你可以尝试将sql语句改成以下形式:
&gt; &gt;
&gt; &gt; select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
&gt; &gt; from `some_source`
&gt; &gt; group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
&gt; &gt;
&gt; &gt; 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
&gt; &gt;
&gt; &gt; Best,
&gt; &gt; Wei
&gt; &gt;
&gt; &gt; &gt; 在 2020年12月13日,13:13,Lucas <[hidden email]&gt; 写道:
&gt; &gt; &gt;
&gt; &gt; &gt; 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
&gt; &gt; &gt;
&gt; &gt; &gt; @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; result_type=DataTypes.ROW(
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataTypes.FIELD('aveBuy', DataTypes.INT())),
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; func_type='pandas')
&gt; &gt; &gt; def orderCalc(code, amount):
&gt; &gt; &gt;
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp; df = pd.DataFrame({'code': code, 'amount': amount})
&gt; &gt; &gt; # pandas 数据处理后输入另一个dataframe output
&gt; &gt; &gt; return (output['buyQtl'], output['aveBuy'])
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; 定义了csv的sink如下
&gt; &gt; &gt;
&gt; &gt; &gt; create table csvSink (
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp; buyQtl BIGINT,
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp; aveBuy INT
&gt; &gt; &gt; ) with (
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp; 'connector.type' = 'filesystem',
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp; 'format.type' = 'csv',
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp; 'connector.path' = 'e:/output'
&gt; &gt; &gt; )
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; 然后进行如下的操作:
&gt; &gt; &gt;
&gt; &gt; &gt; result_table = t_env.sql_query("""
&gt; &gt; &gt; select orderCalc(code, amount)
&gt; &gt; &gt; from `some_source`
&gt; &gt; &gt; group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
&gt; &gt; &gt; """)
&gt; &gt; &gt; result_table.execute_insert("csvSink")
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; 在执行程序的时候提示没法入库
&gt; &gt; &gt;
&gt; &gt; &gt; py4j.protocol.Py4JJavaError: An error occurred while calling
&gt; &gt; &gt; o98.executeInsert.
&gt; &gt; &gt;
&gt; &gt; &gt; : org.apache.flink.table.api.ValidationException: Column types of query
&gt; &gt; &gt; result and sink for registered table
&gt; &gt; &gt; 'default_catalog.default_database.csvSink' do not match.
&gt; &gt; &gt;
&gt; &gt; &gt; Cause: Different number of columns.
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT &gt;]
&gt; &gt; &gt;
&gt; &gt; &gt; Sink schema:&nbsp; [buyQtl: BIGINT, aveBuy: INT]
&gt; &gt; &gt;
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt; &gt;
&gt; &gt;
&gt; org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
&gt; &gt; &gt; ception(DynamicSinkUtils.java:304)
&gt; &gt; &gt;
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt; &gt;
&gt; &gt;
&gt; org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
&gt; &gt; &gt; ImplicitCast(DynamicSinkUtils.java:134)
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; 是UDF的输出结构不对吗,还是需要调整sink table的结构?
&gt; &gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt;