reduce函数的trigger问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

reduce函数的trigger问题

xiaolailong@163.com
您好!最近刚开始学习flink,问一个关于trigger的问题:

如下的reduce操作:
        env.socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(15)))
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                    }
                });

使用的trigger是:
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}

然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
多谢指导!





[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: reduce函数的trigger问题

nobleyd
此处的trigger你可以理解成是计算结果的输出时机,你的reduce计算还是增量的哈。

[hidden email] <[hidden email]> 于2021年1月29日周五 上午11:27写道:

> 您好!最近刚开始学习flink,问一个关于trigger的问题:
>
> 如下的reduce操作:
>         env.socketTextStream("localhost", 9999)
>                 .flatMap(new Splitter())
>                 .keyBy(value -> value.f0)
>                 .window(TumblingEventTimeWindows.of(Time.seconds(15)))
>                 .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
>                     @Override
>                     public Tuple2<String, Integer> reduce(Tuple2<String,
> Integer> value1, Tuple2<String, Integer> value2) throws Exception {
>                         return new Tuple2<>(value1.f0, value1.f1 +
> value2.f1);
>                     }
>                 });
>
> 使用的trigger是:
> @Override
> public Trigger<Object, TimeWindow>
> getDefaultTrigger(StreamExecutionEnvironment env) {
> return EventTimeTrigger.create();
> }
>
>
> 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
> 多谢指导!
>
>
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: reduce函数的trigger问题

nick
In reply to this post by xiaolailong@163.com
窗口没有结束,所有的数据都还在的

[hidden email] <[hidden email]> 于2021年1月29日周五 上午11:27写道:

> 您好!最近刚开始学习flink,问一个关于trigger的问题:
>
> 如下的reduce操作:
>         env.socketTextStream("localhost", 9999)
>                 .flatMap(new Splitter())
>                 .keyBy(value -> value.f0)
>                 .window(TumblingEventTimeWindows.of(Time.seconds(15)))
>                 .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
>                     @Override
>                     public Tuple2<String, Integer> reduce(Tuple2<String,
> Integer> value1, Tuple2<String, Integer> value2) throws Exception {
>                         return new Tuple2<>(value1.f0, value1.f1 +
> value2.f1);
>                     }
>                 });
>
> 使用的trigger是:
> @Override
> public Trigger<Object, TimeWindow>
> getDefaultTrigger(StreamExecutionEnvironment env) {
> return EventTimeTrigger.create();
> }
>
>
> 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
> 多谢指导!
>
>
>
>
>
> [hidden email]
>