我认为这可能是一个bug (当然也可能是故意这样设计的):
在 EvictingWindowOperator.emitWindowContents()位置:
userFunction.process(triggerContext.key, triggerContext.window,
processContext, projectedContents, timestampedCollector);
当timestampedCollector的size = 0时;
执行到 ReduceApplyWindowFunction部分:
public void apply(K k, W window, Iterable<T> input, Collector<R> out) throws
Exception {
T curr = null;
for (T val: input) {
if (curr == null) {
curr = val;
} else {
curr = reduceFunction.reduce(curr, val);
}
}
wrappedFunction.apply(k, window, Collections.singletonList(curr), out);
}
wrappedFunction.apply(k, window, Collections.singletonList(curr),
out);将会产生一个Collections.singletonList(null)结果。
我认为这里应该需要判断一下, 既然input进来是空的,就不应该输出一个null结果
--
Sent from:
http://apache-flink.147419.n8.nabble.com/