FLINK TABLE API 自定义聚合函数UDAF从check point恢复任务报状态序列化不兼容问题( For heap backends, the new state serializer must not be incompatible)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK TABLE API 自定义聚合函数UDAF从check point恢复任务报状态序列化不兼容问题( For heap backends, the new state serializer must not be incompatible)

orlando qi
大家好:

       我在使用flink table api 实现group by
聚合操作的时候,自定义了一个UDAF函数,首次在集群上运行的时候是正确的,但是从check
point恢复任务的时候报下面错误,但是使用内置的函数就没问题,不知道我该怎么解决呢?

java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap
backends, the new state serializer must not be incompatible.
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:227)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:270)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

groupBy: (risk_id, dev_only_id), select: (risk_id, dev_only_id,
UpdateColumn(org_name, evaluation_time_millis) AS TMP_586,
UpdateColumn(rule_weight_sum, evaluation_time_millis) AS TMP_581,
UpdateColumn(area_code, evaluation_time_millis) AS TMP_583, SUM(rule_risk)
AS TMP_580, UpdateColumn(ip, evaluation_time_millis) AS TMP_587,
UpdateColumn(evaluation_time_millis, evaluation_time_millis) AS TMP_588,
UpdateColumn(area_name, evaluation_time_millis) AS TMP_584,
UpdateColumn(evaluation_time, evaluation_time_millis) AS TMP_582,
UpdateColumn(org_code, evaluation_time_millis) AS TMP_585) -> select:
(risk_id, _UTF-16LE'''' AS risk_name, /(TMP_580, CAST(TMP_581)) AS
risk_value, TMP_582 AS evaluation_time, TMP_583 AS area_code, TMP_584 AS
area_name, TMP_585 AS org_code, TMP_586 AS org_name, dev_only_id, TMP_587
AS ip, CAST(TMP_581) AS rule_weight_sum, CAST(TMP_588) AS
evaluation_time_millis) -> to: Tuple2 -> Filter -> Map -> from: (risk_id,
risk_name, risk_value, evaluation_time, area_code, area_name, org_code,
org_name, dev_only_id, ip, risk_weight_sum, evaluation_time_millis) (3/6)


--


祁豪兵
Reply | Threaded
Open this post in threaded view
|

Re: FLINK TABLE API 自定义聚合函数UDAF从check point恢复任务报状态序列化不兼容问题( For heap backends, the new state serializer must not be incompatible)

Jark
Administrator
Hi Qi,

你在checkpoint 恢复任务前,有修改过 UDAF 的 ACC 结构吗?

另外,如果可以的话,最好也能带上 UDAF 的实现,和 SQL。

Best,
Jark

> 在 2019年8月23日,11:08,orlando qi <[hidden email]> 写道:
>
>
> at

Reply | Threaded
Open this post in threaded view
|

Re: FLINK TABLE API 自定义聚合函数UDAF从check point恢复任务报状态序列化不兼容问题( For heap backends, the new state serializer must not be incompatible)

orlando qi
没有改变,我主要是来测试恢复任务是不是成功。

import java.lang.{Double => JDouble, Long => JLong, String => JString}

import com.vrv.bigdata.scala.datetime.DateTimeUtil
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.table.functions.AggregateFunction

/**
  * 自定义聚合函数:更新列值
  */
class NewColumn {
  var column: JString = ""
  var timeMillis: JLong = 0L
}
class UpdateColumn  extends AggregateFunction[JString, NewColumn]{

  /**
    *  创建累加器
    * @return
    */
  override def createAccumulator() = new NewColumn

  /**
    * 更新累加器数据
    * @param acc 累加器
    * @param column 新列值
    */
  def accumulate(acc: NewColumn, column: JString, timeMillis: JLong) = {

    val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis
    if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准
      acc.column = column
      acc.timeMillis = timeMillis
    } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值
      acc.column = column
      acc.timeMillis = timeMillis
    } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新

  }

  /**
    * 更新累加器数据
    * @param acc 累加器
    * @param column 新列值
    */
  def accumulate(acc: NewColumn, column: JDouble, timeMillis: JLong): Unit = {

    val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis
    if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准
      acc.column = column.toString
      acc.timeMillis = timeMillis
    } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值
      acc.column = column.toString
      acc.timeMillis = timeMillis
    } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新
  }

  /**
    * 更新累加器数据
    * @param acc 累加器
    * @param column 新列值
    */
  def accumulate(acc: NewColumn, column: JLong, timeMillis: JLong): Unit = {

    val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis
    if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准
      acc.column = column.toString
      acc.timeMillis = timeMillis
    } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值
      acc.column = column.toString
      acc.timeMillis = timeMillis
    } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新
  }

  /**
    * 更新累加器数据
    * @param acc 累加器
    * @param column 新列值
    */
  def retract(acc: NewColumn, column: JString, timeMillis: JLong) = {

    val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis
    if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准
      acc.column = column
      acc.timeMillis = timeMillis
    } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值
      acc.column = column
      acc.timeMillis = timeMillis
    } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新
  }

  /**
    * 更新累加器数据
    * @param acc 累加器
    * @param column 新列值
    */
  def retract(acc: NewColumn, column: JDouble, timeMillis: JLong) = {

    val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis
    if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准
      acc.column = column.toString
      acc.timeMillis = timeMillis
    } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值
      acc.column = column.toString
      acc.timeMillis = timeMillis
    } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新
  }

  /**
    * 更新累加器数据
    * @param acc 累加器
    * @param column 新列值
    */
  def retract(acc: NewColumn, column: JLong, timeMillis: JLong) = {

    val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis
    if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准
      acc.column = column.toString
      acc.timeMillis = timeMillis
    } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值
      acc.column = column.toString
      acc.timeMillis = timeMillis
    } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新
  }

  /**
    * 获取累加器值
    * @param acc
    * @return
    */
  override def getValue(acc: NewColumn): JString = acc.column

  /**
    *
    * @return
    */
  override def getAccumulatorType: TypeInformation[NewColumn] =
TypeInformation.of(classOf[NewColumn])


  /**
    * 获取返回值类型
    * @return
    */
  override def getResultType: TypeInformation[JString] = Types.STRING
}

val update = new UpdateColumn()

// 根据三级子规则风险流计算、生成二级规则风险动态表
val ruleRiskAssessmentTable = childRuleRisk.map(x => {

  TableChildRuleRiskAssessment(x.ruleId,
    x.childRuleId,
    x.childRuleName,
    x.childRuleRisk,
    x.evaluationTime,
    x.areaCode,
    x.areaName,
    x.orgCode,
    x.orgName,
    x.devOnlyId,
    x.ip,
    x.childRuleWeightSum,
    x.evaluationTimeMillis)

}).toTable(streamTableEnv)
  .groupBy(tableConf.getProperty("child.rule.risk.table.group.field"))
  .select('rule_id,
    'child_rule_id,
    update('child_rule_name, 'evaluation_time_millis) as 'child_rule_name,
    update('child_rule_risk,
'evaluation_time_millis).cast(Types.DOUBLE) as 'child_rule_risk,
    update('evaluation_time, 'evaluation_time_millis) as 'evaluation_time,
    update('area_code, 'evaluation_time_millis) as 'area_code,
    update('area_name, 'evaluation_time_millis) as 'area_name,
    update('org_code, 'evaluation_time_millis) as 'org_code,
    update('org_name, 'evaluation_time_millis) as 'org_name,
    'dev_only_id,
    update('ip, 'evaluation_time_millis) as 'ip,
    update('child_rule_weight_sum,
'evaluation_time_millis).cast(Types.DOUBLE) as 'child_rule_weight_sum,
    update('evaluation_time_millis,
'evaluation_time_millis).cast(Types.LONG) as 'evaluation_time_millis)
  .groupBy(tableConf.getProperty("rule.risk.table.group.field"))
  .select('''.toString as 'risk_id,
    'rule_id,
    '''.toString as 'rule_name,
    'child_rule_risk.sum/update('child_rule_weight_sum,
'evaluation_time_millis).cast(Types.DOUBLE) as 'rule_risk,
    update('evaluation_time, 'evaluation_time_millis) as 'evaluation_time,
    update('area_code, 'evaluation_time_millis) as 'area_code,
    update('area_name, 'evaluation_time_millis) as 'area_name,
    update('org_code, 'evaluation_time_millis) as 'org_code,
    update('org_name, 'evaluation_time_millis) as 'org_name,
    'dev_only_id,
    update('ip, 'evaluation_time_millis) as 'ip,
    update('child_rule_weight_sum, 'evaluation_time_millis).cast(Types.DOUBLE),
    '''.toString as 'rule_is_one_ticket_veto,
    '''.toString as 'deducting_reason,
    '''.toString as 'repair_method,
    update('evaluation_time_millis,
'evaluation_time_millis).cast(Types.LONG) as 'evaluation_time_millis)
  .toRetractStream[RuleRiskAssessment]
  .filter(_._1)
  .map(x => {

    TableRuleRiskAssessment(x._2.riskId,
      x._2.ruleId,
      x._2.ruleName,
      if(x._2.ruleRisk <= 1.0) x._2.ruleRisk else 1.0,
      x._2.evaluationTime,
      x._2.areaCode,
      x._2.areaName,
      x._2.orgCode,
      x._2.orgName,
      x._2.devOnlyId,
      x._2.ip,
      x._2.ruleWeightSum,
      x._2.ruleIsOneTicketVeto,
      x._2.evaluationTimeMillis)

  }).toTable(streamTableEnv)
  .groupBy(tableConf.getProperty("rule.risk.table.group.field.select.group.field"))
  .select('risk_id,
    'rule_id,
    update('rule_name, 'evaluation_time_millis) as 'rule_name,
    update('rule_risk, 'evaluation_time_millis).cast(Types.DOUBLE) as
'rule_risk,
    update('evaluation_time, 'evaluation_time_millis) as 'evaluation_time,
    update('area_code, 'evaluation_time_millis) as 'area_code,
    update('area_name, 'evaluation_time_millis) as 'area_name,
    update('org_code, 'evaluation_time_millis) as 'org_code,
    update('org_name, 'evaluation_time_millis) as 'org_name,
    'dev_only_id,
    update('ip, 'evaluation_time_millis) as 'ip,
    update('rule_weight_sum,
'evaluation_time_millis).cast(Types.DOUBLE) as 'rule_weight_sum,
    '''.toString as 'rule_is_one_ticket_veto,
    '''.toString as 'deducting_reason,
    '''.toString as 'repair_method,
    update('evaluation_time_millis,
'evaluation_time_millis).cast(Types.LONG) as 'evaluation_time_millis)

// 将二级规则风险动态表转换成二级规则流
val ruleRiskAssessmentStream =
ruleRiskAssessmentTable.toRetractStream[RuleRiskAssessment]
  .filter(_._1)
  .map{_._2}
  .map(x => {

    // 封装二级规则风险评估数据
    Map("stream_rule_id" -> x.ruleId,
      "stream_rule_risk" -> x.ruleRisk.toString,
      "stream_evaluation_time" -> x.evaluationTime,
      "stream_area_code" -> x.areaCode,
      "stream_area_name" -> x.areaName,
      "stream_org_code" -> x.orgCode,
      "stream_org_name" -> x.orgName,
      "stream_dev_only_id" -> x.devOnlyId,
      "stream_ip" -> x.ip,
      "stream_deducting_reason" -> x.deductingReason,
      "stream_repair_method" -> x.repairMethod,
      "stream_evaluation_time_millis" -> x.evaluationTimeMillis.toString)
  })


Jark Wu <[hidden email]> 于2019年8月26日周一 下午12:18写道:

> Hi Qi,
>
> 你在checkpoint 恢复任务前,有修改过 UDAF 的 ACC 结构吗?
>
> 另外,如果可以的话,最好也能带上 UDAF 的实现,和 SQL。
>
> Best,
> Jark
>
> > 在 2019年8月23日,11:08,orlando qi <[hidden email]> 写道:
> >
> >
> > at
>
>

--


祁豪兵