|
您好!在使用keyed 窗口进行聚合操作时,使用了trigger触发器触发窗口操作,使用过程遇到几个问题,无法解决:
计算逻辑:对一批数据根据key进行分组聚合,使用滚动窗口,由于开窗时间是24小时,为了减少窗口的内存大小,触发器onElement()方法每次元素进入时,判读满足条件的数据触发窗口计算(FIRE_AND_PURGE),窗口计算时将窗口数据清除,将结果保存到窗口的valuestate中,当事件时间到达时,调用onEventTime()方法,触发计算。这个过程出现2个问题无法解决,描述如下:
1、经过keyby分组后,任意一条数据的事件时间满足watermark之后,会执行自定义触发器的onEventTime()方法,这条数据对应的key会执行窗口计算,其他key没有执行窗口计算,调试发现原因是:“窗口数据已经被清除,导致无法执行窗口计算” 。可问题是,我想执行所有key的窗口计算,并能判断这次计算是窗口最后一次计算,从valuestate取数据做一些汇总统计。简言之,窗口关闭时需要所有可以均参与计算,窗口计算函数能判断是否是最后(窗口时间到达)一次计算。
2、当执行触发器的onElement()方法或者onEventTime()方法时,除了返回TriggerResult外,我想在触发器执行时给窗口计算函数传递一些状态变量,这种问题如何解决呢?
备注:
自定义的触发器
class MyTrigger extends Trigger<RateInfo, TimeWindow>
自定义的窗口
class MyProcessFunction extends ProcessWindowFunction<RateInfo, String, String, TimeWindow>
分组聚合
WindowedStream<RateInfo, String, TimeWindow> trigger = timeStream
.keyBy(new KeySelector<RateInfo, String>() {
@Override
public String getKey(RateInfo value) throws Exception {
return value.getId();
}
})
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16)))
.trigger(new MyTrigger());
SingleOutputStreamOperator<String> resultStream = trigger.process(new MyProcessFunction());
最后感谢您!
-----------------
何立腾
中国 深圳
TEL:13737047391
2021年6月11日
|