看起来不应该,你这种写法表达的就是1s一次写mysql,不会多次。每条消息触发计算是没问题的,但窗口最终输出是1s窗口结束之后才会输出的吧。
[hidden email] <[hidden email]> 于2020年8月19日周三 下午6:27写道: > > 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 > > keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new > ListAggregate()).addSink(new TemplateMySQLSink()); > > > ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 > > 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 > > 有什么方式让一个窗口只做一次 aggregate 操作吗? > > 谢谢, > 王磊 > > > > [hidden email] > > |
我也很纳闷。我在 TemplateMySQLSink invoke 方法的 log 里确实是每条记录都会触发。
------------------ Original ------------------ From: "赵一旦"<[hidden email]>; Date: Wed, Aug 19, 2020 06:41 PM To: "user-zh"<[hidden email]>; Subject: Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function 看起来不应该,你这种写法表达的就是1s一次写mysql,不会多次。每条消息触发计算是没问题的,但窗口最终输出是1s窗口结束之后才会输出的吧。 [hidden email] <[hidden email]> 于2020年8月19日周三 下午6:27写道: > > 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 > > keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new > ListAggregate()).addSink(new TemplateMySQLSink()); > > > ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 > > 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 > > 有什么方式让一个窗口只做一次 aggregate 操作吗? > > 谢谢, > 王磊 > > > > [hidden email] > > |
我用的是flink 最新版 : flink-1.11.1
------------------ Original ------------------ From: "王磊2"<[hidden email]>; Date: Wed, Aug 19, 2020 07:24 PM To: "user-zh"<[hidden email]>; "hinobleyd"<[hidden email]>; Subject: Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function &nbsp;我也很纳闷。我在 TemplateMySQLSink&nbsp;invoke 方法的 log 里确实是每条记录都会触发。 &nbsp; ------------------&nbsp;Original&nbsp;------------------ From: &nbsp;"赵一旦"<[hidden email]&gt;; Date: &nbsp;Wed, Aug 19, 2020 06:41 PM To: &nbsp;"user-zh"<[hidden email]&gt;; Subject: &nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function &nbsp; 看起来不应该,你这种写法表达的就是1s一次写mysql,不会多次。每条消息触发计算是没问题的,但窗口最终输出是1s窗口结束之后才会输出的吧。 [hidden email] <[hidden email]&gt; 于2020年8月19日周三 下午6:27写道: &gt; &gt; 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 &gt; &gt; keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new &gt; ListAggregate()).addSink(new TemplateMySQLSink()); &gt; &gt; &gt; ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 &gt; &gt; 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 &gt; &gt; 有什么方式让一个窗口只做一次 aggregate 操作吗? &gt; &gt; 谢谢, &gt; 王磊 &gt; &gt; &gt; &gt; [hidden email] &gt; &gt; |
是我自己搞错了。我 debug 日志的逻辑有问题。
------------------ Original ------------------ From: "王磊2"<[hidden email]>; Date: Wed, Aug 19, 2020 07:45 PM To: "user-zh"<[hidden email]>; Subject: Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function 我用的是flink 最新版 : flink-1.11.1 ------------------ Original ------------------ From: "王磊2"<[hidden email]>; Date: Wed, Aug 19, 2020 07:24 PM To: "user-zh"<[hidden email]>; "hinobleyd"<[hidden email]>; Subject: Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function &nbsp;我也很纳闷。我在 TemplateMySQLSink&nbsp;invoke 方法的 log 里确实是每条记录都会触发。 &nbsp; ------------------&nbsp;Original&nbsp;------------------ From: &nbsp;"赵一旦"<[hidden email]&gt;; Date: &nbsp;Wed, Aug 19, 2020 06:41 PM To: &nbsp;"user-zh"<[hidden email]&gt;; Subject: &nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function &nbsp; 看起来不应该,你这种写法表达的就是1s一次写mysql,不会多次。每条消息触发计算是没问题的,但窗口最终输出是1s窗口结束之后才会输出的吧。 [hidden email] <[hidden email]&gt; 于2020年8月19日周三 下午6:27写道: &gt; &gt; 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 &gt; &gt; keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new &gt; ListAggregate()).addSink(new TemplateMySQLSink()); &gt; &gt; &gt; ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 &gt; &gt; 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 &gt; &gt; 有什么方式让一个窗口只做一次 aggregate 操作吗? &gt; &gt; 谢谢, &gt; 王磊 &gt; &gt; &gt; &gt; [hidden email] &gt; &gt; |
In reply to this post by nobleyd
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).
*apply(...)*.addSink(new TemplateMySQLSink()); On Wed, Aug 19, 2020 at 6:27 PM [hidden email] <[hidden email]> wrote: > > 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 > > keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new > ListAggregate()).addSink(new TemplateMySQLSink()); > > > ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 > > 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 > > 有什么方式让一个窗口只做一次 aggregate 操作吗? > > 谢谢, > 王磊 > > > > [hidden email] > > |
Free forum by Nabble | Edit this page |