按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)……………
.window(TumblingEventTimeWindows.of(Time.days(1))) .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{ private var state: MapState[String,Boolean] = _ override def open override def process override def clear(ctx: Context): Unit = { state.clear() } } |
按我的理解,参考aggregate(AggregateFunction<T, ACC, V> aggFunction,
ProcessWindowFunction<V, R, K, W> windowFunction)方法, 窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。 x <[hidden email]> 于2020年8月25日周二 下午6:25写道: > > 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)…………… > .window(TumblingEventTimeWindows.of(Time.days(1))) > .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) > .evictor(TimeEvictor.of(Time.seconds(0), true)) > .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{ > private var state: MapState[String,Boolean] = _ > override def open > override def process > override def clear(ctx: Context): Unit = { > state.clear() > } > } |
如果把窗口设短一点,比如10分钟,触发还是1分钟,.window(TumblingEventTimeWindows.of(Time.minutes(10))).trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))),观察状态确实是10分钟清理一次,但是不是很理解为什么开一天的窗口,状态就不会清理,而是一直累积
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月26日(星期三) 中午1:31 收件人: "[hidden email]"<[hidden email]>; 主题: Re: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1 按我的理解,参考aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)方法, 窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。 x <[hidden email]> 于2020年8月25日周二 下午6:25写道: > > 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)…………… > .window(TumblingEventTimeWindows.of(Time.days(1))) > .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) > .evictor(TimeEvictor.of(Time.seconds(0), true)) > .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{ > private var state: MapState[String,Boolean] = _ > override def open > override def process > override def clear(ctx: Context): Unit = { > state.clear() > } > } |
解决了,原来是时区的问题,窗口减去8小时就好了
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) ------------------ 原始邮件 ------------------ 发件人: "x" <[hidden email]>; 发送时间: 2020年8月27日(星期四) 下午2:00 收件人: "[hidden email]"<[hidden email]>; 主题: 回复: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1 如果把窗口设短一点,比如10分钟,触发还是1分钟,.window(TumblingEventTimeWindows.of(Time.minutes(10))).trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))),观察状态确实是10分钟清理一次,但是不是很理解为什么开一天的窗口,状态就不会清理,而是一直累积 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月26日(星期三) 中午1:31 收件人: "[hidden email]"<[hidden email]>; 主题: Re: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1 按我的理解,参考aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)方法, 窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。 x <[hidden email]> 于2020年8月25日周二 下午6:25写道: > > 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)…………… > .window(TumblingEventTimeWindows.of(Time.days(1))) > .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) > .evictor(TimeEvictor.of(Time.seconds(0), true)) > .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{ > private var state: MapState[String,Boolean] = _ > override def open > override def process > override def clear(ctx: Context): Unit = { > state.clear() > } > } |
Free forum by Nabble | Edit this page |