Sql往kafka表写聚合数据报错

classic Classic list List threaded Threaded
4 messages Options
op
Reply | Threaded
Open this post in threaded view
|

Sql往kafka表写聚合数据报错

op
如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream??


谢谢


Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.mvp_rtdwb_user_business' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS follow_count, SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11) AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS event_time])
Reply | Threaded
Open this post in threaded view
|

Re: Sql往kafka表写聚合数据报错

Benchao Li-2
你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。

op <[hidden email]> 于2020年7月29日周三 上午11:59写道:

> 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream??
>
>
> 谢谢
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Table sink 'default_catalog.default_database.mvp_rtdwb_user_business'
> doesn't support consuming update changes which is produced by node
> GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS
> text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS
> be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS
> share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS follow_count,
> SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11)
> AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS event_time])



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Sql往kafka表写聚合数据报错

Jark
Administrator
抱歉哈,1.11 提供的 Canal 和 Debezium 还不支持 sink 。 预计会在 1.12 中提供。

On Wed, 29 Jul 2020 at 12:51, Benchao Li <[hidden email]> wrote:

> 你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。
>
> op <[hidden email]> 于2020年7月29日周三 上午11:59写道:
>
> > 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream??
> >
> >
> > 谢谢
> >
> >
> > Exception in thread "main" org.apache.flink.table.api.TableException:
> > Table sink 'default_catalog.default_database.mvp_rtdwb_user_business'
> > doesn't support consuming update changes which is produced by node
> > GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS
> > text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS
> > be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS
> > share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS
> follow_count,
> > SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11)
> > AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS
> event_time])
>
>
>
> --
>
> Best,
> Benchao Li
>
op
Reply | Threaded
Open this post in threaded view
|

回复: Sql往kafka表写聚合数据报错

op
嗯 谢谢,那能不能像1.10那样自定义connector type


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年7月30日(星期四) 上午10:39
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Sql往kafka表写聚合数据报错



抱歉哈,1.11 提供的 Canal 和 Debezium 还不支持 sink 。 预计会在 1.12 中提供。

On Wed, 29 Jul 2020 at 12:51, Benchao Li <[hidden email]&gt; wrote:

&gt; 你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。
&gt;
&gt; op <[hidden email]&gt; 于2020年7月29日周三 上午11:59写道:
&gt;
&gt; &gt; 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream??
&gt; &gt;
&gt; &gt;
&gt; &gt; 谢谢
&gt; &gt;
&gt; &gt;
&gt; &gt; Exception in thread "main" org.apache.flink.table.api.TableException:
&gt; &gt; Table sink 'default_catalog.default_database.mvp_rtdwb_user_business'
&gt; &gt; doesn't support consuming update changes which is produced by node
&gt; &gt; GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS
&gt; &gt; text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS
&gt; &gt; be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS
&gt; &gt; share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS
&gt; follow_count,
&gt; &gt; SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11)
&gt; &gt; AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS
&gt; event_time])
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li
&gt;