Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

nobleyd
看起来不应该,你这种写法表达的就是1s一次写mysql,不会多次。每条消息触发计算是没问题的,但窗口最终输出是1s窗口结束之后才会输出的吧。

[hidden email] <[hidden email]> 于2020年8月19日周三 下午6:27写道:

>
> 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
>
> keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
> ListAggregate()).addSink(new TemplateMySQLSink());
>
>
> ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
>
> 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
>
> 有什么方式让一个窗口只做一次 aggregate 操作吗?
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
>
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

wanglei2@geekplus.com
&nbsp;我也很纳闷。我在 TemplateMySQLSink&nbsp;invoke 方法的 log 里确实是每条记录都会触发。
&nbsp;
------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"赵一旦"<[hidden email]&gt;;
Date: &nbsp;Wed, Aug 19, 2020 06:41 PM
To: &nbsp;"user-zh"<[hidden email]&gt;;

Subject: &nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

&nbsp;

看起来不应该,你这种写法表达的就是1s一次写mysql,不会多次。每条消息触发计算是没问题的,但窗口最终输出是1s窗口结束之后才会输出的吧。

[hidden email] <[hidden email]&gt; 于2020年8月19日周三 下午6:27写道:

&gt;
&gt; 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
&gt;
&gt; keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
&gt; ListAggregate()).addSink(new TemplateMySQLSink());
&gt;
&gt;
&gt; ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
&gt;
&gt; 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
&gt;
&gt; 有什么方式让一个窗口只做一次 aggregate 操作吗?
&gt;
&gt; 谢谢,
&gt; 王磊
&gt;
&gt;
&gt;
&gt; [hidden email]
&gt;
&gt;
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

wanglei2@geekplus.com
&nbsp;我用的是flink 最新版 : flink-1.11.1
&nbsp;
------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"王磊2"<[hidden email]&gt;;
Date: &nbsp;Wed, Aug 19, 2020 07:24 PM
To: &nbsp;"user-zh"<[hidden email]&gt;; "hinobleyd"<[hidden email]&gt;;

Subject: &nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

&nbsp;

&amp;nbsp;我也很纳闷。我在 TemplateMySQLSink&amp;nbsp;invoke 方法的 log 里确实是每条记录都会触发。
&amp;nbsp;
------------------&amp;nbsp;Original&amp;nbsp;------------------
From: &amp;nbsp;"赵一旦"<[hidden email]&amp;gt;;
Date: &amp;nbsp;Wed, Aug 19, 2020 06:41 PM
To: &amp;nbsp;"user-zh"<[hidden email]&amp;gt;;

Subject: &amp;nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

&amp;nbsp;

看起来不应该,你这种写法表达的就是1s一次写mysql,不会多次。每条消息触发计算是没问题的,但窗口最终输出是1s窗口结束之后才会输出的吧。

[hidden email] <[hidden email]&amp;gt; 于2020年8月19日周三 下午6:27写道:

&amp;gt;
&amp;gt; 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
&amp;gt;
&amp;gt; keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
&amp;gt; ListAggregate()).addSink(new TemplateMySQLSink());
&amp;gt;
&amp;gt;
&amp;gt; ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
&amp;gt;
&amp;gt; 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
&amp;gt;
&amp;gt; 有什么方式让一个窗口只做一次 aggregate 操作吗?
&amp;gt;
&amp;gt; 谢谢,
&amp;gt; 王磊
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; [hidden email]
&amp;gt;
&amp;gt;
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

wanglei2@geekplus.com
&nbsp;是我自己搞错了。我 debug 日志的逻辑有问题。


&nbsp;
------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"王磊2"<[hidden email]&gt;;
Date: &nbsp;Wed, Aug 19, 2020 07:45 PM
To: &nbsp;"user-zh"<[hidden email]&gt;;

Subject: &nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

&nbsp;



&nbsp;我用的是flink 最新版 : flink-1.11.1
&nbsp;
------------------ Original ------------------
From: &nbsp;"王磊2"<[hidden email]&gt;;
Date: &nbsp;Wed, Aug 19, 2020 07:24 PM
To: &nbsp;"user-zh"<[hidden email]&gt;; "hinobleyd"<[hidden email]&gt;;

Subject: &nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

&nbsp;

&amp;nbsp;我也很纳闷。我在 TemplateMySQLSink&amp;nbsp;invoke 方法的 log 里确实是每条记录都会触发。
&amp;nbsp;
------------------&amp;nbsp;Original&amp;nbsp;------------------
From: &amp;nbsp;"赵一旦"<[hidden email]&amp;gt;;
Date: &amp;nbsp;Wed, Aug 19, 2020 06:41 PM
To: &amp;nbsp;"user-zh"<[hidden email]&amp;gt;;

Subject: &amp;nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

&amp;nbsp;

看起来不应该,你这种写法表达的就是1s一次写mysql,不会多次。每条消息触发计算是没问题的,但窗口最终输出是1s窗口结束之后才会输出的吧。

[hidden email] <[hidden email]&amp;gt; 于2020年8月19日周三 下午6:27写道:

&amp;gt;
&amp;gt; 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
&amp;gt;
&amp;gt; keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
&amp;gt; ListAggregate()).addSink(new TemplateMySQLSink());
&amp;gt;
&amp;gt;
&amp;gt; ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
&amp;gt;
&amp;gt; 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
&amp;gt;
&amp;gt; 有什么方式让一个窗口只做一次 aggregate 操作吗?
&amp;gt;
&amp;gt; 谢谢,
&amp;gt; 王磊
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; [hidden email]
&amp;gt;
&amp;gt;
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

yobdcdoll
In reply to this post by nobleyd
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).
*apply(...)*.addSink(new TemplateMySQLSink());

On Wed, Aug 19, 2020 at 6:27 PM [hidden email] <[hidden email]>
wrote:

>
> 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
>
> keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
> ListAggregate()).addSink(new TemplateMySQLSink());
>
>
> ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
>
> 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
>
> 有什么方式让一个窗口只做一次 aggregate 操作吗?
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
>