您好!最近刚开始学习flink,问一个关于trigger的问题:
如下的reduce操作: env.socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(15))) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }); 使用的trigger是: @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录? 多谢指导! [hidden email] |
此处的trigger你可以理解成是计算结果的输出时机,你的reduce计算还是增量的哈。
[hidden email] <[hidden email]> 于2021年1月29日周五 上午11:27写道: > 您好!最近刚开始学习flink,问一个关于trigger的问题: > > 如下的reduce操作: > env.socketTextStream("localhost", 9999) > .flatMap(new Splitter()) > .keyBy(value -> value.f0) > .window(TumblingEventTimeWindows.of(Time.seconds(15))) > .reduce(new ReduceFunction<Tuple2<String, Integer>>() { > @Override > public Tuple2<String, Integer> reduce(Tuple2<String, > Integer> value1, Tuple2<String, Integer> value2) throws Exception { > return new Tuple2<>(value1.f0, value1.f1 + > value2.f1); > } > }); > > 使用的trigger是: > @Override > public Trigger<Object, TimeWindow> > getDefaultTrigger(StreamExecutionEnvironment env) { > return EventTimeTrigger.create(); > } > > > 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录? > 多谢指导! > > > > > > [hidden email] > |
In reply to this post by xiaolailong@163.com
窗口没有结束,所有的数据都还在的
[hidden email] <[hidden email]> 于2021年1月29日周五 上午11:27写道: > 您好!最近刚开始学习flink,问一个关于trigger的问题: > > 如下的reduce操作: > env.socketTextStream("localhost", 9999) > .flatMap(new Splitter()) > .keyBy(value -> value.f0) > .window(TumblingEventTimeWindows.of(Time.seconds(15))) > .reduce(new ReduceFunction<Tuple2<String, Integer>>() { > @Override > public Tuple2<String, Integer> reduce(Tuple2<String, > Integer> value1, Tuple2<String, Integer> value2) throws Exception { > return new Tuple2<>(value1.f0, value1.f1 + > value2.f1); > } > }); > > 使用的trigger是: > @Override > public Trigger<Object, TimeWindow> > getDefaultTrigger(StreamExecutionEnvironment env) { > return EventTimeTrigger.create(); > } > > > 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录? > 多谢指导! > > > > > > [hidden email] > |
Free forum by Nabble | Edit this page |