StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint

HunterXHunter
代码如下: evictor设置的在窗口触发前清理所有数据,按理进入sum是没有数据,但是调试的时候发现,sum经过计算会输出 null 进入
print,导致报 Nullpoint。不知道是bug还是我的问题;

class A {
            String word;
            Long time;

            public A(String word, Long time) {
                this.word = word;
                this.time = time;
            }
        };
        streamEnv.fromElements(new A("a", 1L))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.
forBoundedOutOfOrderness(Duration.ofSeconds(10))
                                .withTimestampAssigner(((element,
recordTimestamp) -> element.time))
                )
                .keyBy(x -> x.time)
        .map(x -> new Tuple2<>(x.word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG))
        .keyBy((KeySelector<Tuple2&lt;String, Long>, String>) o -> o.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(20)))
                .evictor(TimeEvictor.of(Time.seconds(0)))
                .sum(1)
        .print();
        streamEnv.execute();



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint

HunterXHunter
我认为这可能是一个bug (当然也可能是故意这样设计的):
在 EvictingWindowOperator.emitWindowContents()位置:
userFunction.process(triggerContext.key, triggerContext.window,
processContext, projectedContents, timestampedCollector);
当timestampedCollector的size = 0时;
执行到 ReduceApplyWindowFunction部分:
public void apply(K k, W window, Iterable<T> input, Collector<R> out) throws
Exception {
                T curr = null;
                for (T val: input) {
                        if (curr == null) {
                                curr = val;
                        } else {
                                curr = reduceFunction.reduce(curr, val);
                        }
                }
                wrappedFunction.apply(k, window, Collections.singletonList(curr), out);
        }

wrappedFunction.apply(k, window, Collections.singletonList(curr),
out);将会产生一个Collections.singletonList(null)结果。
我认为这里应该需要判断一下, 既然input进来是空的,就不应该输出一个null结果




--
Sent from: http://apache-flink.147419.n8.nabble.com/