Flink问题咨询-触发器传递参数到窗口及窗口关闭执行计算问

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink问题咨询-触发器传递参数到窗口及窗口关闭执行计算问

何立腾
您好!在使用keyed 窗口进行聚合操作时,使用了trigger触发器触发窗口操作,使用过程遇到几个问题,无法解决:


计算逻辑:对一批数据根据key进行分组聚合,使用滚动窗口,由于开窗时间是24小时,为了减少窗口的内存大小,触发器onElement()方法每次元素进入时,判读满足条件的数据触发窗口计算(FIRE_AND_PURGE),窗口计算时将窗口数据清除,将结果保存到窗口的valuestate中,当事件时间到达时,调用onEventTime()方法,触发计算。这个过程出现2个问题无法解决,描述如下:



1、经过keyby分组后,任意一条数据的事件时间满足watermark之后,会执行自定义触发器的onEventTime()方法,这条数据对应的key会执行窗口计算,其他key没有执行窗口计算,调试发现原因是:“窗口数据已经被清除,导致无法执行窗口计算”  。可问题是,我想执行所有key的窗口计算,并能判断这次计算是窗口最后一次计算,从valuestate取数据做一些汇总统计。简言之,窗口关闭时需要所有可以均参与计算,窗口计算函数能判断是否是最后(窗口时间到达)一次计算。


2、当执行触发器的onElement()方法或者onEventTime()方法时,除了返回TriggerResult外,我想在触发器执行时给窗口计算函数传递一些状态变量,这种问题如何解决呢?




备注:
自定义的触发器
class MyTrigger extends Trigger<RateInfo, TimeWindow&gt;&nbsp;
自定义的窗口
class MyProcessFunction extends ProcessWindowFunction<RateInfo, String, String, TimeWindow&gt;&nbsp;


分组聚合
&nbsp; &nbsp; &nbsp; WindowedStream<RateInfo, String, TimeWindow&gt; trigger = timeStream
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(new KeySelector<RateInfo, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(RateInfo value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.getId();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16)))
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .trigger(new MyTrigger());


&nbsp; &nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<String&gt; resultStream = trigger.process(new MyProcessFunction());







最后感谢您!


-----------------
何立腾
中国&nbsp;深圳
TEL:13737047391
2021年6月11日