|
请问一下,我在做processing time window的时候因为需要按照数量和时间两个维度处理数据,所以自定义了Trigger方法
其中onElement方法和onProcessingTime方法如下:
@Override
public TriggerResult onElement(Object element, long timestamp, W
window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
long timeout = System.currentTimeMillis() + 30000L;
ctx.registerProcessingTimeTimer(timeout);
System.out.println("new timeout " + timeout);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window,
TriggerContext ctx) throws Exception {
System.out.println("current timeout " + time);
return TriggerResult.FIRE_AND_PURGE;
}
我的调用方式如下:
DataStream<Tuple2<String, Integer>> dataStream = env
.addSource(consumer)
.flatMap(new CustomFlatMapFunction())
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
.trigger(ProcessingTimeWithCountTrigger.of(5))
.sum(1);
请问为何我在onElement中调用的ctx.registerProcessingTimeTimer并没有作用呢?onProcessingTime还是按照原来设置的30秒时间触发了
|