如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream??
谢谢 Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.mvp_rtdwb_user_business' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS follow_count, SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11) AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS event_time]) |
你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。
op <[hidden email]> 于2020年7月29日周三 上午11:59写道: > 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream?? > > > 谢谢 > > > Exception in thread "main" org.apache.flink.table.api.TableException: > Table sink 'default_catalog.default_database.mvp_rtdwb_user_business' > doesn't support consuming update changes which is produced by node > GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS > text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS > be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS > share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS follow_count, > SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11) > AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS event_time]) -- Best, Benchao Li |
Administrator
|
抱歉哈,1.11 提供的 Canal 和 Debezium 还不支持 sink 。 预计会在 1.12 中提供。
On Wed, 29 Jul 2020 at 12:51, Benchao Li <[hidden email]> wrote: > 你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。 > > op <[hidden email]> 于2020年7月29日周三 上午11:59写道: > > > 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream?? > > > > > > 谢谢 > > > > > > Exception in thread "main" org.apache.flink.table.api.TableException: > > Table sink 'default_catalog.default_database.mvp_rtdwb_user_business' > > doesn't support consuming update changes which is produced by node > > GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS > > text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS > > be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS > > share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS > follow_count, > > SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11) > > AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS > event_time]) > > > > -- > > Best, > Benchao Li > |
嗯 谢谢,那能不能像1.10那样自定义connector type
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月30日(星期四) 上午10:39 收件人: "user-zh"<[hidden email]>; 主题: Re: Sql往kafka表写聚合数据报错 抱歉哈,1.11 提供的 Canal 和 Debezium 还不支持 sink 。 预计会在 1.12 中提供。 On Wed, 29 Jul 2020 at 12:51, Benchao Li <[hidden email]> wrote: > 你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。 > > op <[hidden email]> 于2020年7月29日周三 上午11:59写道: > > > 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream?? > > > > > > 谢谢 > > > > > > Exception in thread "main" org.apache.flink.table.api.TableException: > > Table sink 'default_catalog.default_database.mvp_rtdwb_user_business' > > doesn't support consuming update changes which is produced by node > > GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS > > text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS > > be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS > > share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS > follow_count, > > SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11) > > AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS > event_time]) > > > > -- > > Best, > Benchao Li > |
Free forum by Nabble | Edit this page |