关于定期清理state,但是checkpoint size大小一直在增加的疑问

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

关于定期清理state,但是checkpoint size大小一直在增加的疑问

陈赋赟
各位好,本人在使用Flink processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction open()里我初始化了value state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下


job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
    .map(rawEvent => {
try {
val eventJson = parse(rawEvent)

if ((eventJson \ "type").extract[String] == "track" &&
            (eventJson \ "project_id").extract[Int] == 6 &&
            (eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
            (eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
      } catch {
case _ => None
      }
    }).uid("DianDianUserAppViewCount_Map")
    .filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
    .flatMap{
      item =>
val timestamp = (item.get \ "time").extract[Long]
        SlidingEventTimeWindows
            .of(Time.days(90), Time.days(1), Time.hours(-8))
            .assignWindows(null, timestamp, null)
            .map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
                )
            }
    }.uid("DianDianUserAppViewCount_FlatMap")
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
    }).uid("DianDianUserAppViewCount_Watermarker")
    .keyBy(_.key)
    .process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
    .writeUsingOutputFormat(new HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")


class ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)] {

// 30天毫秒值  1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 2592000000L
private val dateFormat = new SimpleDateFormat("yyyyMMdd")

private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))

override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {

if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp

// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()

if (value.time >= beforeThirtyDaysStartTime &&
          value.event.equals("$AppViewScreen")){
        currentValue = currentValue + 1
appViewCount.update(currentValue)
      }

// 发送到下游
out.collect(
        (value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue)
      )

// 设置cleanup timer
registerCleanupTimer(value, ctx)
    }

  }

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {

if (appViewCount != null){
appViewCount.clear()
    }

  }

/**
    * 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
    * 则认为是迟到数据不进行处理。
    * @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
    ctx.timerService().currentWatermark() >= value.windowEndTime
  }

/**
    * 以窗口结束时间做为清理定时器的时间戳
    * @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) = {
val cleanupTime = value.windowEndTime
    ctx.timerService().registerEventTimeTimer(cleanupTime)
  }
}