自定义Trigger中ctx.registerProcessingTimeTimer无法更新onProcessingTime调用时间

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

自定义Trigger中ctx.registerProcessingTimeTimer无法更新onProcessingTime调用时间

吴治国
请问一下,我在做processing time window的时候因为需要按照数量和时间两个维度处理数据,所以自定义了Trigger方法

其中onElement方法和onProcessingTime方法如下:

@Override
public TriggerResult onElement(Object element, long timestamp, W
window, TriggerContext ctx) throws Exception {
    ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
    count.add(1L);
    long timeout = System.currentTimeMillis() + 30000L;
    ctx.registerProcessingTimeTimer(timeout);
    System.out.println("new timeout " + timeout);
    if (count.get() >= maxCount) {
        count.clear();
        return TriggerResult.FIRE_AND_PURGE;
    }
    return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, W window,
TriggerContext ctx) throws Exception {
    System.out.println("current timeout " + time);
    return TriggerResult.FIRE_AND_PURGE;
}

我的调用方式如下:

DataStream<Tuple2<String, Integer>> dataStream = env
        .addSource(consumer)
        .flatMap(new CustomFlatMapFunction())
        .keyBy(0)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
        .trigger(ProcessingTimeWithCountTrigger.of(5))
        .sum(1);

请问为何我在onElement中调用的ctx.registerProcessingTimeTimer并没有作用呢?onProcessingTime还是按照原来设置的30秒时间触发了