接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink()); ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 有什么方式让一个窗口只做一次 aggregate 操作吗? 谢谢, 王磊 [hidden email] |
依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。
在 2020-08-19 18:27:25,"[hidden email]" <[hidden email]> 写道: > >接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 > >keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink()); > > >ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 > >但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 > >有什么方式让一个窗口只做一次 aggregate 操作吗? > >谢谢, >王磊 > > > >[hidden email] > |
我改成下面这样还是同样的问题
我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process( new ProcessWindowFunction<ObjectNode, List<ObjectNode>, Tuple2<String, JsonNode>, TimeWindow>() { @Override public void process(Tuple2<String, JsonNode> stringJsonNodeTuple2, Context context, Iterable<ObjectNode> iterable, Collector<List<ObjectNode>> collector) throws Exception { List<ObjectNode> lists = new ArrayList<ObjectNode>(); for(ObjectNode node : iterable){ lists.add(node); } collector.collect(lists); } }).addSink(new TemplateMySQLSink()); ------------------ Original ------------------ From: "guoliang_wang1335"<[hidden email]>; Date: Wed, Aug 19, 2020 06:49 PM To: "user-zh"<[hidden email]>; Subject: Re:TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function 依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。 在 2020-08-19 18:27:25,"[hidden email]" <[hidden email]> 写道: > >接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 > >keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink()); > > >ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 > >但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 > >有什么方式让一个窗口只做一次 aggregate 操作吗? > >谢谢, >王磊 > > > >[hidden email] > |
没搞懂你们说的每一条记录一次入库是如何观察出来的,1s的周期,那么快咋观察的。
aggregate就是每条记录触发一次,但最终输出到下游是1s周期到了才输出的。 王磊2 <[hidden email]> 于2020年8月19日周三 下午7:18写道: > 我改成下面这样还是同样的问题 > 我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1 > > > > keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process( > new ProcessWindowFunction<ObjectNode, List<ObjectNode>, > Tuple2<String, JsonNode>, TimeWindow>() { > @Override > public void process(Tuple2<String, JsonNode> > stringJsonNodeTuple2, Context context, > Iterable<ObjectNode> iterable, > Collector<List<ObjectNode>> collector) throws Exception { > List<ObjectNode> lists = new ArrayList<ObjectNode>(); > for(ObjectNode node : iterable){ > lists.add(node); > } > collector.collect(lists); > } > }).addSink(new TemplateMySQLSink()); > ------------------ Original ------------------ > From: "guoliang_wang1335"<[hidden email]>; > Date: Wed, Aug 19, 2020 06:49 PM > To: "user-zh"<[hidden email]>; > > Subject: Re:TumblingProcessingTimeWindows 每来一条消息就会 trigger > aggregate function > > > > > 依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。 > > > > > 在 2020-08-19 18:27:25,"[hidden email]" <[hidden email]> > 写道: > > > >接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 > > > >keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new > ListAggregate()).addSink(new TemplateMySQLSink()); > > > > > >ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 > > > >但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 > > > >有什么方式让一个窗口只做一次 aggregate 操作吗? > > > >谢谢, > >王磊 > > > > > > > >[hidden email] > > |
我把 windows 时间设置成了 10s , 直接用命令行往kafka 发消息
head -100 filename |./bin/kafka-console-producer --broker-list 172.19.78.50:9092,172.19.78.51:9092,172.19.78.52:9092 --topic ods_artemis_out_order --property parse.key=true 在 TemplateMySQLSink log 日志,运行后在 日志中看到的 ------------------ Original ------------------ From: "赵一旦"<[hidden email]>; Date: Wed, Aug 19, 2020 10:04 PM To: "user-zh"<[hidden email]>; Subject: Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function 没搞懂你们说的每一条记录一次入库是如何观察出来的,1s的周期,那么快咋观察的。 aggregate就是每条记录触发一次,但最终输出到下游是1s周期到了才输出的。 王磊2 <[hidden email]> 于2020年8月19日周三 下午7:18写道: > 我改成下面这样还是同样的问题 > 我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1 > > > > &nbsp;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process( > new ProcessWindowFunction<ObjectNode, List<ObjectNode&gt;, > Tuple2<String, JsonNode&gt;, TimeWindow&gt;() { > @Override > public void process(Tuple2<String, JsonNode&gt; > stringJsonNodeTuple2, Context context, > Iterable<ObjectNode&gt; iterable, > Collector<List<ObjectNode&gt;&gt; collector) throws Exception { > List<ObjectNode&gt; lists = new ArrayList<ObjectNode&gt;(); > for(ObjectNode node : iterable){ > lists.add(node); > } > collector.collect(lists); > } > }).addSink(new TemplateMySQLSink());&nbsp; > ------------------&nbsp;Original&nbsp;------------------ > From: &nbsp;"guoliang_wang1335"<[hidden email]&gt;; > Date: &nbsp;Wed, Aug 19, 2020 06:49 PM > To: &nbsp;"user-zh"<[hidden email]&gt;; > > Subject: &nbsp;Re:TumblingProcessingTimeWindows 每来一条消息就会 trigger > aggregate function > > &nbsp; > > > 依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。 > > > > > 在 2020-08-19 18:27:25,"[hidden email]" <[hidden email]&gt; > 写道: > &gt; > &gt;接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 > &gt; > &gt;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new > ListAggregate()).addSink(new TemplateMySQLSink()); > &gt; > &gt; > &gt;ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 > &gt; > &gt;但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 > &gt; > &gt;有什么方式让一个窗口只做一次 aggregate 操作吗? > &gt; > &gt;谢谢, > &gt;王磊 > &gt; > &gt; > &gt; > &gt;[hidden email] > &gt; |
Free forum by Nabble | Edit this page |