关于定期清理state,但是checkpoint size大小一直在增加的疑问

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

关于定期清理state,但是checkpoint size大小一直在增加的疑问

Bo斷弦乄Jay
各位好,本人在使用Flink processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction open()里我初始化了value state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下 job以及processFunction代码 val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")     .map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \ "type").extract[String] == "track" &&             (eventJson \ "project_id").extract[Int] == 6 &&             (eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&             (eventJson \ "time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None       } catch { case _ => None       }     }).uid("DianDianUserAppViewCount_Map")     .filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")     .flatMap{       item => val timestamp = (item.get \ "time").extract[Long]         SlidingEventTimeWindows             .of(Time.days(90), Time.days(1), Time.hours(-8))             .assignWindows(null, timestamp, null)             .map{ case timeWindow => RichAppViewEvent( s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \ "distinct_id").extract[String], (item.get \ "event").extract[String], timestamp, timeWindow.getEnd                 )             }     }.uid("DianDianUserAppViewCount_FlatMap")     .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) { override def extractTimestamp(element: RichAppViewEvent): Long = element.time     }).uid("DianDianUserAppViewCount_Watermarker")     .keyBy(_.key)     .process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")     .writeUsingOutputFormat(new HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)] { // 30天毫秒值  1000 * 60 * 60 * 24 * 30 private val thirtyDaysForTimestamp = 2592000000L private val dateFormat = new SimpleDateFormat("yyyyMMdd") private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", classOf[Int])) override def processElement(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context, out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ // 根据窗口结束时间往前推30天 val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp // 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加 var currentValue = appViewCount.value() if (value.time >= beforeThirtyDaysStartTime &&           value.event.equals("$AppViewScreen")){         currentValue = currentValue + 1 appViewCount.update(currentValue)       } // 发送到下游 out.collect(         (value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue)       ) // 设置cleanup timer registerCleanupTimer(value, ctx)     }   } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#OnTimerContext, out: Collector[(String, String, Int)]): Unit = { if (appViewCount != null){ appViewCount.clear()     }   } /**     * 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,     * 则认为是迟到数据不进行处理。     * @param value * @param ctx * @return */ private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context): Boolean = {     ctx.timerService().currentWatermark() >= value.windowEndTime   } /**     * 以窗口结束时间做为清理定时器的时间戳     * @param value * @param ctx */ private def registerCleanupTimer(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) = { val cleanupTime = value.windowEndTime     ctx.timerService().registerEventTimeTimer(cleanupTime)   } }