​请问在flinksql中如何使用聚合函数 LISTAGG

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

​请问在flinksql中如何使用聚合函数 LISTAGG

cxydevelop@163.com
版本:
flinksql 1.11.0
需求:
需要实现多行聚合成一行功能
代码如下:
environmentSettings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings = environmentSettings)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')


a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
a_table = t_env.from_pandas(a_df,
                    DataTypes.ROW([DataTypes.FIELD("id", DataTypes.STRING()),
                                         DataTypes.FIELD("uuid", DataTypes.STRING())]))
t_env.create_temporary_view("table_a",a_table)


b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
table_b = t_env.from_pandas(b_df ,
                    DataTypes.ROW([DataTypes.FIELD("val", DataTypes.STRING()),
                                         DataTypes.FIELD("uuid", DataTypes.STRING())]))
t_env.create_temporary_view("table_b",table_b)


t_env.sql_update("""
CREATE TABLE mySink (                                                                                        
b varchar ,
c varchar
) WITH (                                                        
'connector' = 'print'      
)
""")


t_env.sql_update("""
    insert into mySink
    select t1.id ,LISTAGG(t2.val , ',')
    from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
    group by t1.id
""")
t_env.execute("tutorial_job")


报错:
Caused by: java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) at org.apache.flink.table.data.RowData.get(RowData.java:273) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown Source) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745)



Reply | Threaded
Open this post in threaded view
|

Re: ​请问在flinksql中如何使用聚合函数 LISTAGG

Benchao Li-2
看起来是一个已知bug[1],已经修复,但是还没有发布。

[1] https://issues.apache.org/jira/browse/FLINK-18862

chenxuying <[hidden email]> 于2020年8月12日周三 下午9:25写道:

> 版本:
> flinksql 1.11.0
> 需求:
> 需要实现多行聚合成一行功能
> 代码如下:
> environmentSettings =
> EnvironmentSettings.new_instance().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(environment_settings =
> environmentSettings)
> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
>
>
> a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
> a_table = t_env.from_pandas(a_df,
>                     DataTypes.ROW([DataTypes.FIELD("id",
> DataTypes.STRING()),
>                                          DataTypes.FIELD("uuid",
> DataTypes.STRING())]))
> t_env.create_temporary_view("table_a",a_table)
>
>
> b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
> table_b = t_env.from_pandas(b_df ,
>                     DataTypes.ROW([DataTypes.FIELD("val",
> DataTypes.STRING()),
>                                          DataTypes.FIELD("uuid",
> DataTypes.STRING())]))
> t_env.create_temporary_view("table_b",table_b)
>
>
> t_env.sql_update("""
> CREATE TABLE mySink (
>
> b varchar ,
> c varchar
> ) WITH (
> 'connector' = 'print'
> )
> """)
>
>
> t_env.sql_update("""
>     insert into mySink
>     select t1.id ,LISTAGG(t2.val , ',')
>     from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
>     group by t1.id
> """)
> t_env.execute("tutorial_job")
>
>
> 报错:
> Caused by: java.lang.ClassCastException:
> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
> org.apache.flink.table.data.StringData at
> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
> at
> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139)
> at org.apache.flink.table.data.RowData.get(RowData.java:273) at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
> at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown
> Source) at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
> java.lang.Thread.run(Thread.java:745)
>
>
>
>

--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re:Re: ​请问在flinksql中如何使用聚合函数 LISTAGG

cxydevelop@163.com
好的 , 原来是bug , 感谢回答


在 2020-08-12 21:32:40,"Benchao Li" <[hidden email]> 写道:

>看起来是一个已知bug[1],已经修复,但是还没有发布。
>
>[1] https://issues.apache.org/jira/browse/FLINK-18862
>
>chenxuying <[hidden email]> 于2020年8月12日周三 下午9:25写道:
>
>> 版本:
>> flinksql 1.11.0
>> 需求:
>> 需要实现多行聚合成一行功能
>> 代码如下:
>> environmentSettings =
>> EnvironmentSettings.new_instance().in_streaming_mode().build()
>> t_env = StreamTableEnvironment.create(environment_settings =
>> environmentSettings)
>> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
>> 'true')
>>
>>
>> a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
>> a_table = t_env.from_pandas(a_df,
>>                     DataTypes.ROW([DataTypes.FIELD("id",
>> DataTypes.STRING()),
>>                                          DataTypes.FIELD("uuid",
>> DataTypes.STRING())]))
>> t_env.create_temporary_view("table_a",a_table)
>>
>>
>> b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
>> table_b = t_env.from_pandas(b_df ,
>>                     DataTypes.ROW([DataTypes.FIELD("val",
>> DataTypes.STRING()),
>>                                          DataTypes.FIELD("uuid",
>> DataTypes.STRING())]))
>> t_env.create_temporary_view("table_b",table_b)
>>
>>
>> t_env.sql_update("""
>> CREATE TABLE mySink (
>>
>> b varchar ,
>> c varchar
>> ) WITH (
>> 'connector' = 'print'
>> )
>> """)
>>
>>
>> t_env.sql_update("""
>>     insert into mySink
>>     select t1.id ,LISTAGG(t2.val , ',')
>>     from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
>>     group by t1.id
>> """)
>> t_env.execute("tutorial_job")
>>
>>
>> 报错:
>> Caused by: java.lang.ClassCastException:
>> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
>> org.apache.flink.table.data.StringData at
>> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
>> at
>> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139)
>> at org.apache.flink.table.data.RowData.get(RowData.java:273) at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
>> at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
>> at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>> at
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
>> at
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>> at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown
>> Source) at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
>> java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>
>--
>
>Best,
>Benchao Li