也就是说需要用KeyedProcessFunction代替ProcessWindowFunction.
------------------ 原始邮件 ------------------
发件人: "user-zh" <
[hidden email]>;
发送时间: 2020年10月19日(星期一) 下午3:28
收件人: "user-zh"<
[hidden email]>;
主题: Re: 求助:如何处理数据不连续导致状态无法清理
Hi
或许你可以使用 timer 来进行兜底,注册一个未来某个时间的 timer,然后 timer 触发的时候把 state 清理掉
Best,
Congxian
x <
[hidden email]> 于2020年10月19日周一 下午2:55写道:
> 版本为v1.10.1
> 使用AggregateFunction+ProcessWindowFunction的方式,进行实时统计,ProcessWindowFunction中涉及状态的累计运算,使用事件时间,按维度+日期分区,按分钟开窗,跨天需要将状态清除,避免状态越来越大。状态清除的逻辑,覆盖ProcessWindowFunction的clear方法,判断窗口开始时间是否为“23:59:00”,如下:override
> def clear(ctx: Context): Unit = {
> val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart)
> if(dt.equals("23:59:00")){
>
> state.clear()遇到的一个问题是,开窗前,keyBy分区时,有的key对应的数据不连续,十分稀疏,可能会出现每天的最后一个窗口没有数据,导致无法触发状态清理逻辑,导致总状态数据越来越大的现象,请问各位老师,有什么好的办法,可以避免这种情况吗?