ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1

x_gothicist
按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)……………
.window(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
private var state: MapState[String,Boolean] = _
override def open
override def process
override def clear(ctx: Context): Unit = {
            state.clear()
}
}
Reply | Threaded
Open this post in threaded view
|

Re: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1

shizk233
按我的理解,参考aggregate(AggregateFunction<T, ACC, V> aggFunction,
ProcessWindowFunction<V, R, K, W> windowFunction)方法,
窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。

x <[hidden email]> 于2020年8月25日周二 下午6:25写道:

>
> 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)……………
> .window(TumblingEventTimeWindows.of(Time.days(1)))
> .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
> .evictor(TimeEvictor.of(Time.seconds(0), true))
> .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
> private var state: MapState[String,Boolean] = _
> override def open
> override def process
> override def clear(ctx: Context): Unit = {
>             state.clear()
> }
> }
Reply | Threaded
Open this post in threaded view
|

回复: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1

x_gothicist
如果把窗口设短一点,比如10分钟,触发还是1分钟,.window(TumblingEventTimeWindows.of(Time.minutes(10))).trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))),观察状态确实是10分钟清理一次,但是不是很理解为什么开一天的窗口,状态就不会清理,而是一直累积


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月26日(星期三) 中午1:31
收件人:&nbsp;"[hidden email]"<[hidden email]&gt;;

主题:&nbsp;Re: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1



按我的理解,参考aggregate(AggregateFunction<T, ACC, V&gt; aggFunction,
ProcessWindowFunction<V, R, K, W&gt; windowFunction)方法,
窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。

x <[hidden email]&gt; 于2020年8月25日周二 下午6:25写道:

&gt;
&gt; 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)……………
&gt; .window(TumblingEventTimeWindows.of(Time.days(1)))
&gt; .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
&gt; .evictor(TimeEvictor.of(Time.seconds(0), true))
&gt; .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
&gt; private var state: MapState[String,Boolean] = _
&gt; override def open
&gt; override def process
&gt; override def clear(ctx: Context): Unit = {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; state.clear()
&gt; }
&gt; }
Reply | Threaded
Open this post in threaded view
|

回复: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1

x_gothicist
解决了,原来是时区的问题,窗口减去8小时就好了
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "x"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月27日(星期四) 下午2:00
收件人:&nbsp;"[hidden email]"<[hidden email]&gt;;

主题:&nbsp;回复: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1





如果把窗口设短一点,比如10分钟,触发还是1分钟,.window(TumblingEventTimeWindows.of(Time.minutes(10))).trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))),观察状态确实是10分钟清理一次,但是不是很理解为什么开一天的窗口,状态就不会清理,而是一直累积


------------------ 原始邮件 ------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月26日(星期三) 中午1:31
收件人:&nbsp;"[hidden email]"<[hidden email]&gt;;

主题:&nbsp;Re: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1



按我的理解,参考aggregate(AggregateFunction<T, ACC, V&gt; aggFunction,
ProcessWindowFunction<V, R, K, W&gt; windowFunction)方法,
窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。

x <[hidden email]&gt; 于2020年8月25日周二 下午6:25写道:

&gt;
&gt; 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)……………
&gt; .window(TumblingEventTimeWindows.of(Time.days(1)))
&gt; .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
&gt; .evictor(TimeEvictor.of(Time.seconds(0), true))
&gt; .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
&gt; private var state: MapState[String,Boolean] = _
&gt; override def open
&gt; override def process
&gt; override def clear(ctx: Context): Unit = {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; state.clear()
&gt; }
&gt; }