TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

wanglei2@geekplus.com

接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中

keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink());


ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次

但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作

有什么方式让一个窗口只做一次 aggregate 操作吗?

谢谢,
王磊



[hidden email]

Reply | Threaded
Open this post in threaded view
|

Re:TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

guoliang_wang1335
依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。




在 2020-08-19 18:27:25,"[hidden email]" <[hidden email]> 写道:

>
>接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
>
>keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink());
>
>
>ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
>
>但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
>
>有什么方式让一个窗口只做一次 aggregate 操作吗?
>
>谢谢,
>王磊
>
>
>
>[hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re:TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

wanglei2@geekplus.com
我改成下面这样还是同样的问题
我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1


&nbsp;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(
    new ProcessWindowFunction<ObjectNode, List<ObjectNode&gt;, Tuple2<String, JsonNode&gt;, TimeWindow&gt;() {
        @Override
        public void process(Tuple2<String, JsonNode&gt; stringJsonNodeTuple2, Context context,
            Iterable<ObjectNode&gt; iterable, Collector<List<ObjectNode&gt;&gt; collector) throws Exception {
            List<ObjectNode&gt; lists = new ArrayList<ObjectNode&gt;();
            for(ObjectNode node : iterable){
                lists.add(node);
            }
            collector.collect(lists);
        }
    }).addSink(new TemplateMySQLSink());&nbsp;
------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"guoliang_wang1335"<[hidden email]&gt;;
Date: &nbsp;Wed, Aug 19, 2020 06:49 PM
To: &nbsp;"user-zh"<[hidden email]&gt;;

Subject: &nbsp;Re:TumblingProcessingTimeWindows  每来一条消息就会 trigger aggregate function

&nbsp;

依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。




在 2020-08-19 18:27:25,"[hidden email]" <[hidden email]&gt; 写道:
&gt;
&gt;接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
&gt;
&gt;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink());
&gt;
&gt;
&gt;ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
&gt;
&gt;但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
&gt;
&gt;有什么方式让一个窗口只做一次 aggregate 操作吗?
&gt;
&gt;谢谢,
&gt;王磊
&gt;
&gt;
&gt;
&gt;[hidden email]
&gt;
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

nobleyd
没搞懂你们说的每一条记录一次入库是如何观察出来的,1s的周期,那么快咋观察的。
aggregate就是每条记录触发一次,但最终输出到下游是1s周期到了才输出的。

王磊2 <[hidden email]> 于2020年8月19日周三 下午7:18写道:

> 我改成下面这样还是同样的问题
> 我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1
>
>
>
> &nbsp;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(
>     new ProcessWindowFunction<ObjectNode, List<ObjectNode&gt;,
> Tuple2<String, JsonNode&gt;, TimeWindow&gt;() {
>         @Override
>         public void process(Tuple2<String, JsonNode&gt;
> stringJsonNodeTuple2, Context context,
>             Iterable<ObjectNode&gt; iterable,
> Collector<List<ObjectNode&gt;&gt; collector) throws Exception {
>             List<ObjectNode&gt; lists = new ArrayList<ObjectNode&gt;();
>             for(ObjectNode node : iterable){
>                 lists.add(node);
>             }
>             collector.collect(lists);
>         }
>     }).addSink(new TemplateMySQLSink());&nbsp;
> ------------------&nbsp;Original&nbsp;------------------
> From: &nbsp;"guoliang_wang1335"<[hidden email]&gt;;
> Date: &nbsp;Wed, Aug 19, 2020 06:49 PM
> To: &nbsp;"user-zh"<[hidden email]&gt;;
>
> Subject: &nbsp;Re:TumblingProcessingTimeWindows  每来一条消息就会 trigger
> aggregate function
>
> &nbsp;
>
>
> 依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。
>
>
>
>
> 在 2020-08-19 18:27:25,"[hidden email]" <[hidden email]&gt;
> 写道:
> &gt;
> &gt;接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
> &gt;
> &gt;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
> ListAggregate()).addSink(new TemplateMySQLSink());
> &gt;
> &gt;
> &gt;ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
> &gt;
> &gt;但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
> &gt;
> &gt;有什么方式让一个窗口只做一次 aggregate 操作吗?
> &gt;
> &gt;谢谢,
> &gt;王磊
> &gt;
> &gt;
> &gt;
> &gt;[hidden email]
> &gt;
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

wanglei2@geekplus.com
我把 windows 时间设置成了 10s , 直接用命令行往kafka 发消息


head -100 filename&nbsp; |./bin/kafka-console-producer --broker-list 172.19.78.50:9092,172.19.78.51:9092,172.19.78.52:9092 --topic ods_artemis_out_order --property&nbsp; parse.key=true&nbsp;


在 TemplateMySQLSink log 日志,运行后在 日志中看到的




------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"赵一旦"<[hidden email]&gt;;
Date: &nbsp;Wed, Aug 19, 2020 10:04 PM
To: &nbsp;"user-zh"<[hidden email]&gt;;

Subject: &nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

&nbsp;

没搞懂你们说的每一条记录一次入库是如何观察出来的,1s的周期,那么快咋观察的。
aggregate就是每条记录触发一次,但最终输出到下游是1s周期到了才输出的。

王磊2 <[hidden email]&gt; 于2020年8月19日周三 下午7:18写道:

&gt; 我改成下面这样还是同样的问题
&gt; 我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1
&gt;
&gt;
&gt;
&gt; &amp;nbsp;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(
&gt;&nbsp;&nbsp;&nbsp;&nbsp; new ProcessWindowFunction<ObjectNode, List<ObjectNode&amp;gt;,
&gt; Tuple2<String, JsonNode&amp;gt;, TimeWindow&amp;gt;() {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public void process(Tuple2<String, JsonNode&amp;gt;
&gt; stringJsonNodeTuple2, Context context,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Iterable<ObjectNode&amp;gt; iterable,
&gt; Collector<List<ObjectNode&amp;gt;&amp;gt; collector) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; List<ObjectNode&amp;gt; lists = new ArrayList<ObjectNode&amp;gt;();
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; for(ObjectNode node : iterable){
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; lists.add(node);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; collector.collect(lists);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }).addSink(new TemplateMySQLSink());&amp;nbsp;
&gt; ------------------&amp;nbsp;Original&amp;nbsp;------------------
&gt; From: &amp;nbsp;"guoliang_wang1335"<[hidden email]&amp;gt;;
&gt; Date: &amp;nbsp;Wed, Aug 19, 2020 06:49 PM
&gt; To: &amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; Subject: &amp;nbsp;Re:TumblingProcessingTimeWindows&nbsp; 每来一条消息就会 trigger
&gt; aggregate function
&gt;
&gt; &amp;nbsp;
&gt;
&gt;
&gt; 依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。
&gt;
&gt;
&gt;
&gt;
&gt; 在 2020-08-19 18:27:25,"[hidden email]" <[hidden email]&amp;gt;
&gt; 写道:
&gt; &amp;gt;
&gt; &amp;gt;接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
&gt; &amp;gt;
&gt; &amp;gt;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
&gt; ListAggregate()).addSink(new TemplateMySQLSink());
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
&gt; &amp;gt;
&gt; &amp;gt;但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
&gt; &amp;gt;
&gt; &amp;gt;有什么方式让一个窗口只做一次 aggregate 操作吗?
&gt; &amp;gt;
&gt; &amp;gt;谢谢,
&gt; &amp;gt;王磊
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;[hidden email]
&gt; &amp;gt;