Hi Jark:
版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka 附上执行sql: create table kafka_table_1 ( `shop_id` varchar, `user_id` bigint, `category_id` int, `ts` bigint, `row_time` timestamp(3), `proc_time` timestamp(3), ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_visit_1', 'connector.startup-mode' = 'latest-offset', 'connector.properties.bootstrap.servers' = 'ip:9092', 'connector.properties.zookeeper.connect' = 'ip:2181', 'update-mode' = 'append', 'format.type' = 'avro-registry', 'format.schema-subject' = 'user_visit', 'format.schema-url'='<a href="http://ip:8081'">http://ip:8081', ) CREATE TABLE hbase_table ( rowKey STRING, cf ROW<age STRING, area STRING> ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = 'hbase_table', 'connector.zookeeper.quorum' = 'ip:2181', 'connector.zookeeper.znode.parent' = '/hbase', 'connector.write.buffer-flush.max-rows' = '1000' ) create table kafka_table_2 ( `shop_id` varchar, `age` varchar, `area` varchar ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_visit_2', 'connector.startup-mode' = 'latest-offset', 'connector.properties.bootstrap.servers' = 'ip:9092', 'connector.properties.zookeeper.connect' = 'ip:2181', 'update-mode' = 'append', 'format.type' = 'avro-registry', 'format.schema-subject' = 'user_visit', 'format.schema-url'='<a href="http://ip:8081'">http://ip:8081', ) insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, proc_time) select shop_id, age, area from kafka_table_1 left join hbase_table for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id = temp.rowKey group by shop_id, age, area 原始邮件 发件人: xiao cai<[hidden email]> 收件人: user-zh<[hidden email]> 发送时间: 2020年8月12日(周三) 15:41 主题: AppendStreamTableSink doesn't support consuming update changes Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate |
是不是update-mode 改用 update模式
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月12日(星期三) 下午3:58 收件人: "user-zh"<[hidden email]>; 主题: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka 附上执行sql: create table kafka_table_1 ( `shop_id` varchar, `user_id` bigint, `category_id` int, `ts` bigint, `row_time` timestamp(3), `proc_time` timestamp(3), ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_visit_1', 'connector.startup-mode' = 'latest-offset', 'connector.properties.bootstrap.servers' = 'ip:9092', 'connector.properties.zookeeper.connect' = 'ip:2181', 'update-mode' = 'append', 'format.type' = 'avro-registry', 'format.schema-subject' = 'user_visit', 'format.schema-url'='<a href="http://ip:8081'">http://ip:8081', ) CREATE TABLE hbase_table ( rowKey STRING, cf ROW<age STRING, area STRING> ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = 'hbase_table', 'connector.zookeeper.quorum' = 'ip:2181', 'connector.zookeeper.znode.parent' = '/hbase', 'connector.write.buffer-flush.max-rows' = '1000' ) create table kafka_table_2 ( `shop_id` varchar, `age` varchar, `area` varchar ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_visit_2', 'connector.startup-mode' = 'latest-offset', 'connector.properties.bootstrap.servers' = 'ip:9092', 'connector.properties.zookeeper.connect' = 'ip:2181', 'update-mode' = 'append', 'format.type' = 'avro-registry', 'format.schema-subject' = 'user_visit', 'format.schema-url'='<a href="http://ip:8081'">http://ip:8081', ) insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, proc_time) select shop_id, age, area from kafka_table_1 left join hbase_table for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id = temp.rowKey group by shop_id, age, area 原始邮件 发件人: xiao cai<[hidden email]> 收件人: user-zh<[hidden email]> 发送时间: 2020年8月12日(周三) 15:41 主题: AppendStreamTableSink doesn't support consuming update changes Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate |
In reply to this post by xiao cai
Hi
Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka connetor 目前的实现是AppendStreamTableSink,所以不能处理 社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。 Best Leonard [1]https://issues.apache.org/jira/browse/FLINK-18826 <https://issues.apache.org/jira/browse/FLINK-18826> > 在 2020年8月12日,15:58,xiao cai <[hidden email]> 写道: > > Hi Jark: > 版本:1.11.0 > 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: > AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate > > > 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka > > > 附上执行sql: > create table kafka_table_1 ( > `shop_id` varchar, > `user_id` bigint, > `category_id` int, > `ts` bigint, > `row_time` timestamp(3), > `proc_time` timestamp(3), > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'user_visit_1', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 'user_visit', > 'format.schema-url'='<a href="http://ip:8081'">http://ip:8081', > ) > > > CREATE TABLE hbase_table ( > rowKey STRING, > cf ROW<age STRING, area STRING> > ) WITH ( > 'connector.type' = 'hbase', > 'connector.version' = '1.4.3', > 'connector.table-name' = 'hbase_table', > 'connector.zookeeper.quorum' = 'ip:2181', > 'connector.zookeeper.znode.parent' = '/hbase', > 'connector.write.buffer-flush.max-rows' = '1000' > ) > > > > > create table kafka_table_2 ( > `shop_id` varchar, > `age` varchar, > `area` varchar > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'user_visit_2', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 'user_visit', > 'format.schema-url'='<a href="http://ip:8081'">http://ip:8081', > ) > > > insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, proc_time) > select shop_id, age, area > from kafka_table_1 left join hbase_table > for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id = temp.rowKey > group by shop_id, age, area > > > 原始邮件 > 发件人: xiao cai<[hidden email]> > 收件人: user-zh<[hidden email]> > 发送时间: 2020年8月12日(周三) 15:41 > 主题: AppendStreamTableSink doesn't support consuming update changes > > > Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate |
In reply to this post by xiao cai
Dear Leonard Xu:
我会去关注这个issue,非常感谢答疑。 原始邮件 发件人: Leonard Xu<[hidden email]> 收件人: user-zh<[hidden email]> 发送时间: 2020年8月12日(周三) 16:05 主题: Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka connetor 目前的实现是AppendStreamTableSink,所以不能处理 社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。 Best Leonard [1]https://issues.apache.org/jira/browse/FLINK-18826 <">https://issues.apache.org/jira/browse/FLINK-18826> > 在 2020年8月12日,15:58,xiao cai <[hidden email]> 写道: > > Hi Jark: > 版本:1.11.0 > 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: > AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate > > > 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka > > > 附上执行sql: > create table kafka_table_1 ( > `shop_id` varchar, > `user_id` bigint, > `category_id` int, > `ts` bigint, > `row_time` timestamp(3), > `proc_time` timestamp(3), > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'user_visit_1', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 'user_visit', > 'format.schema-url'='<a href="http://ip:8081'">http://ip:8081', > ) > > > CREATE TABLE hbase_table ( > rowKey STRING, > cf ROW<age STRING, area STRING> > ) WITH ( > 'connector.type' = 'hbase', > 'connector.version' = '1.4.3', > 'connector.table-name' = 'hbase_table', > 'connector.zookeeper.quorum' = 'ip:2181', > 'connector.zookeeper.znode.parent' = '/hbase', > 'connector.write.buffer-flush.max-rows' = '1000' > ) > > > > > create table kafka_table_2 ( > `shop_id` varchar, > `age` varchar, > `area` varchar > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'user_visit_2', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 'user_visit', > 'format.schema-url'='<a href="http://ip:8081'">http://ip:8081', > ) > > > insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, proc_time) > select shop_id, age, area > from kafka_table_1 left join hbase_table > for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id = temp.rowKey > group by shop_id, age, area > > > 原始邮件 > 发件人: xiao cai<[hidden email]> > 收件人: user-zh<[hidden email]> > 发送时间: 2020年8月12日(周三) 15:41 > 主题: AppendStreamTableSink doesn't support consuming update changes > > > Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate |
Free forum by Nabble | Edit this page |