求助:如何处理数据不连续导致状态无法清理

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

求助:如何处理数据不连续导致状态无法清理

x_gothicist
版本为v1.10.1
使用AggregateFunction+ProcessWindowFunction的方式,进行实时统计,ProcessWindowFunction中涉及状态的累计运算,使用事件时间,按维度+日期分区,按分钟开窗,跨天需要将状态清除,避免状态越来越大。状态清除的逻辑,覆盖ProcessWindowFunction的clear方法,判断窗口开始时间是否为“23:59:00”,如下:override def clear(ctx: Context): Unit = {
  val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart)
  if(dt.equals("23:59:00")){
    state.clear()遇到的一个问题是,开窗前,keyBy分区时,有的key对应的数据不连续,十分稀疏,可能会出现每天的最后一个窗口没有数据,导致无法触发状态清理逻辑,导致总状态数据越来越大的现象,请问各位老师,有什么好的办法,可以避免这种情况吗?
Reply | Threaded
Open this post in threaded view
|

Re: 求助:如何处理数据不连续导致状态无法清理

Congxian Qiu
Hi
     或许你可以使用 timer 来进行兜底,注册一个未来某个时间的 timer,然后 timer 触发的时候把 state 清理掉
Best,
Congxian


x <[hidden email]> 于2020年10月19日周一 下午2:55写道:

> 版本为v1.10.1
> 使用AggregateFunction+ProcessWindowFunction的方式,进行实时统计,ProcessWindowFunction中涉及状态的累计运算,使用事件时间,按维度+日期分区,按分钟开窗,跨天需要将状态清除,避免状态越来越大。状态清除的逻辑,覆盖ProcessWindowFunction的clear方法,判断窗口开始时间是否为“23:59:00”,如下:override
> def clear(ctx: Context): Unit = {
>   val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart)
>   if(dt.equals("23:59:00")){
>
> state.clear()遇到的一个问题是,开窗前,keyBy分区时,有的key对应的数据不连续,十分稀疏,可能会出现每天的最后一个窗口没有数据,导致无法触发状态清理逻辑,导致总状态数据越来越大的现象,请问各位老师,有什么好的办法,可以避免这种情况吗?
Reply | Threaded
Open this post in threaded view
|

回复: 求助:如何处理数据不连续导致状态无法清理

x_gothicist
也就是说需要用KeyedProcessFunction代替ProcessWindowFunction.




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年10月19日(星期一) 下午3:28
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 求助:如何处理数据不连续导致状态无法清理



Hi
&nbsp;&nbsp;&nbsp;&nbsp; 或许你可以使用 timer 来进行兜底,注册一个未来某个时间的 timer,然后 timer 触发的时候把 state 清理掉
Best,
Congxian


x <[hidden email]&gt; 于2020年10月19日周一 下午2:55写道:

&gt; 版本为v1.10.1
&gt; 使用AggregateFunction+ProcessWindowFunction的方式,进行实时统计,ProcessWindowFunction中涉及状态的累计运算,使用事件时间,按维度+日期分区,按分钟开窗,跨天需要将状态清除,避免状态越来越大。状态清除的逻辑,覆盖ProcessWindowFunction的clear方法,判断窗口开始时间是否为“23:59:00”,如下:override
&gt; def clear(ctx: Context): Unit = {
&gt;&nbsp;&nbsp; val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart)
&gt;&nbsp;&nbsp; if(dt.equals("23:59:00")){
&gt;
&gt; state.clear()遇到的一个问题是,开窗前,keyBy分区时,有的key对应的数据不连续,十分稀疏,可能会出现每天的最后一个窗口没有数据,导致无法触发状态清理逻辑,导致总状态数据越来越大的现象,请问各位老师,有什么好的办法,可以避免这种情况吗?