版本:
flinksql 1.11.0 需求: 需要实现多行聚合成一行功能 代码如下: environmentSettings = EnvironmentSettings.new_instance().in_streaming_mode().build() t_env = StreamTableEnvironment.create(environment_settings = environmentSettings) t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true') a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]}) a_table = t_env.from_pandas(a_df, DataTypes.ROW([DataTypes.FIELD("id", DataTypes.STRING()), DataTypes.FIELD("uuid", DataTypes.STRING())])) t_env.create_temporary_view("table_a",a_table) b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]}) table_b = t_env.from_pandas(b_df , DataTypes.ROW([DataTypes.FIELD("val", DataTypes.STRING()), DataTypes.FIELD("uuid", DataTypes.STRING())])) t_env.create_temporary_view("table_b",table_b) t_env.sql_update(""" CREATE TABLE mySink ( b varchar , c varchar ) WITH ( 'connector' = 'print' ) """) t_env.sql_update(""" insert into mySink select t1.id ,LISTAGG(t2.val , ',') from table_a t1 left join table_b t2 on t1.uuid = t2.uuid group by t1.id """) t_env.execute("tutorial_job") 报错: Caused by: java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) at org.apache.flink.table.data.RowData.get(RowData.java:273) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown Source) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) |
看起来是一个已知bug[1],已经修复,但是还没有发布。
[1] https://issues.apache.org/jira/browse/FLINK-18862 chenxuying <[hidden email]> 于2020年8月12日周三 下午9:25写道: > 版本: > flinksql 1.11.0 > 需求: > 需要实现多行聚合成一行功能 > 代码如下: > environmentSettings = > EnvironmentSettings.new_instance().in_streaming_mode().build() > t_env = StreamTableEnvironment.create(environment_settings = > environmentSettings) > t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", > 'true') > > > a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]}) > a_table = t_env.from_pandas(a_df, > DataTypes.ROW([DataTypes.FIELD("id", > DataTypes.STRING()), > DataTypes.FIELD("uuid", > DataTypes.STRING())])) > t_env.create_temporary_view("table_a",a_table) > > > b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]}) > table_b = t_env.from_pandas(b_df , > DataTypes.ROW([DataTypes.FIELD("val", > DataTypes.STRING()), > DataTypes.FIELD("uuid", > DataTypes.STRING())])) > t_env.create_temporary_view("table_b",table_b) > > > t_env.sql_update(""" > CREATE TABLE mySink ( > > b varchar , > c varchar > ) WITH ( > 'connector' = 'print' > ) > """) > > > t_env.sql_update(""" > insert into mySink > select t1.id ,LISTAGG(t2.val , ',') > from table_a t1 left join table_b t2 on t1.uuid = t2.uuid > group by t1.id > """) > t_env.execute("tutorial_job") > > > 报错: > Caused by: java.lang.ClassCastException: > org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to > org.apache.flink.table.data.StringData at > org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) > at > org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) > at org.apache.flink.table.data.RowData.get(RowData.java:273) at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown > Source) at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at > java.lang.Thread.run(Thread.java:745) > > > > -- Best, Benchao Li |
好的 , 原来是bug , 感谢回答
在 2020-08-12 21:32:40,"Benchao Li" <[hidden email]> 写道: >看起来是一个已知bug[1],已经修复,但是还没有发布。 > >[1] https://issues.apache.org/jira/browse/FLINK-18862 > >chenxuying <[hidden email]> 于2020年8月12日周三 下午9:25写道: > >> 版本: >> flinksql 1.11.0 >> 需求: >> 需要实现多行聚合成一行功能 >> 代码如下: >> environmentSettings = >> EnvironmentSettings.new_instance().in_streaming_mode().build() >> t_env = StreamTableEnvironment.create(environment_settings = >> environmentSettings) >> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", >> 'true') >> >> >> a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]}) >> a_table = t_env.from_pandas(a_df, >> DataTypes.ROW([DataTypes.FIELD("id", >> DataTypes.STRING()), >> DataTypes.FIELD("uuid", >> DataTypes.STRING())])) >> t_env.create_temporary_view("table_a",a_table) >> >> >> b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]}) >> table_b = t_env.from_pandas(b_df , >> DataTypes.ROW([DataTypes.FIELD("val", >> DataTypes.STRING()), >> DataTypes.FIELD("uuid", >> DataTypes.STRING())])) >> t_env.create_temporary_view("table_b",table_b) >> >> >> t_env.sql_update(""" >> CREATE TABLE mySink ( >> >> b varchar , >> c varchar >> ) WITH ( >> 'connector' = 'print' >> ) >> """) >> >> >> t_env.sql_update(""" >> insert into mySink >> select t1.id ,LISTAGG(t2.val , ',') >> from table_a t1 left join table_b t2 on t1.uuid = t2.uuid >> group by t1.id >> """) >> t_env.execute("tutorial_job") >> >> >> 报错: >> Caused by: java.lang.ClassCastException: >> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to >> org.apache.flink.table.data.StringData at >> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) >> at >> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) >> at org.apache.flink.table.data.RowData.get(RowData.java:273) at >> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) >> at >> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) >> at >> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) >> at >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) >> at >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) >> at >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) >> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) >> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) >> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown >> Source) at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at >> java.lang.Thread.run(Thread.java:745) >> >> >> >> > >-- > >Best, >Benchao Li |
Free forum by Nabble | Edit this page |