各位好,本人在使用Flink processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction open()里我初始化了value state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下
job以及processFunction代码 val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source") .map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \ "type").extract[String] == "track" && (eventJson \ "project_id").extract[Int] == 6 && (eventJson \ "properties" \ "$is_login_id").extract[Boolean] && (eventJson \ "time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None } catch { case _ => None } }).uid("DianDianUserAppViewCount_Map") .filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter") .flatMap{ item => val timestamp = (item.get \ "time").extract[Long] SlidingEventTimeWindows .of(Time.days(90), Time.days(1), Time.hours(-8)) .assignWindows(null, timestamp, null) .map{ case timeWindow => RichAppViewEvent( s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \ "distinct_id").extract[String], (item.get \ "event").extract[String], timestamp, timeWindow.getEnd ) } }.uid("DianDianUserAppViewCount_FlatMap") .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) { override def extractTimestamp(element: RichAppViewEvent): Long = element.time }).uid("DianDianUserAppViewCount_Watermarker") .keyBy(_.key) .process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process") .writeUsingOutputFormat(new HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)] { // 30天毫秒值 1000 * 60 * 60 * 24 * 30 private val thirtyDaysForTimestamp = 2592000000L private val dateFormat = new SimpleDateFormat("yyyyMMdd") private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", classOf[Int])) override def processElement(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context, out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ // 根据窗口结束时间往前推30天 val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp // 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加 var currentValue = appViewCount.value() if (value.time >= beforeThirtyDaysStartTime && value.event.equals("$AppViewScreen")){ currentValue = currentValue + 1 appViewCount.update(currentValue) } // 发送到下游 out.collect( (value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue) ) // 设置cleanup timer registerCleanupTimer(value, ctx) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#OnTimerContext, out: Collector[(String, String, Int)]): Unit = { if (appViewCount != null){ appViewCount.clear() } } /** * 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话, * 则认为是迟到数据不进行处理。 * @param value * @param ctx * @return */ private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context): Boolean = { ctx.timerService().currentWatermark() >= value.windowEndTime } /** * 以窗口结束时间做为清理定时器的时间戳 * @param value * @param ctx */ private def registerCleanupTimer(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) = { val cleanupTime = value.windowEndTime ctx.timerService().registerEventTimeTimer(cleanupTime) } } |
Free forum by Nabble | Edit this page |