大家好:
我在使用flink table api 实现group by 聚合操作的时候,自定义了一个UDAF函数,首次在集群上运行的时候是正确的,但是从check point恢复任务的时候报下面错误,但是使用内置的函数就没问题,不知道我该怎么解决呢? java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:227) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:270) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 9 more groupBy: (risk_id, dev_only_id), select: (risk_id, dev_only_id, UpdateColumn(org_name, evaluation_time_millis) AS TMP_586, UpdateColumn(rule_weight_sum, evaluation_time_millis) AS TMP_581, UpdateColumn(area_code, evaluation_time_millis) AS TMP_583, SUM(rule_risk) AS TMP_580, UpdateColumn(ip, evaluation_time_millis) AS TMP_587, UpdateColumn(evaluation_time_millis, evaluation_time_millis) AS TMP_588, UpdateColumn(area_name, evaluation_time_millis) AS TMP_584, UpdateColumn(evaluation_time, evaluation_time_millis) AS TMP_582, UpdateColumn(org_code, evaluation_time_millis) AS TMP_585) -> select: (risk_id, _UTF-16LE'''' AS risk_name, /(TMP_580, CAST(TMP_581)) AS risk_value, TMP_582 AS evaluation_time, TMP_583 AS area_code, TMP_584 AS area_name, TMP_585 AS org_code, TMP_586 AS org_name, dev_only_id, TMP_587 AS ip, CAST(TMP_581) AS rule_weight_sum, CAST(TMP_588) AS evaluation_time_millis) -> to: Tuple2 -> Filter -> Map -> from: (risk_id, risk_name, risk_value, evaluation_time, area_code, area_name, org_code, org_name, dev_only_id, ip, risk_weight_sum, evaluation_time_millis) (3/6) -- 祁豪兵 |
Administrator
|
Hi Qi,
你在checkpoint 恢复任务前,有修改过 UDAF 的 ACC 结构吗? 另外,如果可以的话,最好也能带上 UDAF 的实现,和 SQL。 Best, Jark > 在 2019年8月23日,11:08,orlando qi <[hidden email]> 写道: > > > at |
没有改变,我主要是来测试恢复任务是不是成功。
import java.lang.{Double => JDouble, Long => JLong, String => JString} import com.vrv.bigdata.scala.datetime.DateTimeUtil import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.table.functions.AggregateFunction /** * 自定义聚合函数:更新列值 */ class NewColumn { var column: JString = "" var timeMillis: JLong = 0L } class UpdateColumn extends AggregateFunction[JString, NewColumn]{ /** * 创建累加器 * @return */ override def createAccumulator() = new NewColumn /** * 更新累加器数据 * @param acc 累加器 * @param column 新列值 */ def accumulate(acc: NewColumn, column: JString, timeMillis: JLong) = { val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准 acc.column = column acc.timeMillis = timeMillis } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值 acc.column = column acc.timeMillis = timeMillis } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新 } /** * 更新累加器数据 * @param acc 累加器 * @param column 新列值 */ def accumulate(acc: NewColumn, column: JDouble, timeMillis: JLong): Unit = { val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准 acc.column = column.toString acc.timeMillis = timeMillis } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值 acc.column = column.toString acc.timeMillis = timeMillis } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新 } /** * 更新累加器数据 * @param acc 累加器 * @param column 新列值 */ def accumulate(acc: NewColumn, column: JLong, timeMillis: JLong): Unit = { val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准 acc.column = column.toString acc.timeMillis = timeMillis } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值 acc.column = column.toString acc.timeMillis = timeMillis } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新 } /** * 更新累加器数据 * @param acc 累加器 * @param column 新列值 */ def retract(acc: NewColumn, column: JString, timeMillis: JLong) = { val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准 acc.column = column acc.timeMillis = timeMillis } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值 acc.column = column acc.timeMillis = timeMillis } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新 } /** * 更新累加器数据 * @param acc 累加器 * @param column 新列值 */ def retract(acc: NewColumn, column: JDouble, timeMillis: JLong) = { val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准 acc.column = column.toString acc.timeMillis = timeMillis } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值 acc.column = column.toString acc.timeMillis = timeMillis } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新 } /** * 更新累加器数据 * @param acc 累加器 * @param column 新列值 */ def retract(acc: NewColumn, column: JLong, timeMillis: JLong) = { val currentTimeMillis = DateTimeUtil.createCurrentTimeMillis if(acc.timeMillis > currentTimeMillis) { // 上次评估时间大于当前时间,说明服务器发生变化,应以当前为准 acc.column = column.toString acc.timeMillis = timeMillis } else if(timeMillis >= acc.timeMillis) { // 当前评估时间大于或等于上次评估时间,说明当前列值为最新值 acc.column = column.toString acc.timeMillis = timeMillis } // 如果上次评估时间大于当前评估时间,说明数据次序发生变化,上次列值为最新值,不做更新 } /** * 获取累加器值 * @param acc * @return */ override def getValue(acc: NewColumn): JString = acc.column /** * * @return */ override def getAccumulatorType: TypeInformation[NewColumn] = TypeInformation.of(classOf[NewColumn]) /** * 获取返回值类型 * @return */ override def getResultType: TypeInformation[JString] = Types.STRING } val update = new UpdateColumn() // 根据三级子规则风险流计算、生成二级规则风险动态表 val ruleRiskAssessmentTable = childRuleRisk.map(x => { TableChildRuleRiskAssessment(x.ruleId, x.childRuleId, x.childRuleName, x.childRuleRisk, x.evaluationTime, x.areaCode, x.areaName, x.orgCode, x.orgName, x.devOnlyId, x.ip, x.childRuleWeightSum, x.evaluationTimeMillis) }).toTable(streamTableEnv) .groupBy(tableConf.getProperty("child.rule.risk.table.group.field")) .select('rule_id, 'child_rule_id, update('child_rule_name, 'evaluation_time_millis) as 'child_rule_name, update('child_rule_risk, 'evaluation_time_millis).cast(Types.DOUBLE) as 'child_rule_risk, update('evaluation_time, 'evaluation_time_millis) as 'evaluation_time, update('area_code, 'evaluation_time_millis) as 'area_code, update('area_name, 'evaluation_time_millis) as 'area_name, update('org_code, 'evaluation_time_millis) as 'org_code, update('org_name, 'evaluation_time_millis) as 'org_name, 'dev_only_id, update('ip, 'evaluation_time_millis) as 'ip, update('child_rule_weight_sum, 'evaluation_time_millis).cast(Types.DOUBLE) as 'child_rule_weight_sum, update('evaluation_time_millis, 'evaluation_time_millis).cast(Types.LONG) as 'evaluation_time_millis) .groupBy(tableConf.getProperty("rule.risk.table.group.field")) .select('''.toString as 'risk_id, 'rule_id, '''.toString as 'rule_name, 'child_rule_risk.sum/update('child_rule_weight_sum, 'evaluation_time_millis).cast(Types.DOUBLE) as 'rule_risk, update('evaluation_time, 'evaluation_time_millis) as 'evaluation_time, update('area_code, 'evaluation_time_millis) as 'area_code, update('area_name, 'evaluation_time_millis) as 'area_name, update('org_code, 'evaluation_time_millis) as 'org_code, update('org_name, 'evaluation_time_millis) as 'org_name, 'dev_only_id, update('ip, 'evaluation_time_millis) as 'ip, update('child_rule_weight_sum, 'evaluation_time_millis).cast(Types.DOUBLE), '''.toString as 'rule_is_one_ticket_veto, '''.toString as 'deducting_reason, '''.toString as 'repair_method, update('evaluation_time_millis, 'evaluation_time_millis).cast(Types.LONG) as 'evaluation_time_millis) .toRetractStream[RuleRiskAssessment] .filter(_._1) .map(x => { TableRuleRiskAssessment(x._2.riskId, x._2.ruleId, x._2.ruleName, if(x._2.ruleRisk <= 1.0) x._2.ruleRisk else 1.0, x._2.evaluationTime, x._2.areaCode, x._2.areaName, x._2.orgCode, x._2.orgName, x._2.devOnlyId, x._2.ip, x._2.ruleWeightSum, x._2.ruleIsOneTicketVeto, x._2.evaluationTimeMillis) }).toTable(streamTableEnv) .groupBy(tableConf.getProperty("rule.risk.table.group.field.select.group.field")) .select('risk_id, 'rule_id, update('rule_name, 'evaluation_time_millis) as 'rule_name, update('rule_risk, 'evaluation_time_millis).cast(Types.DOUBLE) as 'rule_risk, update('evaluation_time, 'evaluation_time_millis) as 'evaluation_time, update('area_code, 'evaluation_time_millis) as 'area_code, update('area_name, 'evaluation_time_millis) as 'area_name, update('org_code, 'evaluation_time_millis) as 'org_code, update('org_name, 'evaluation_time_millis) as 'org_name, 'dev_only_id, update('ip, 'evaluation_time_millis) as 'ip, update('rule_weight_sum, 'evaluation_time_millis).cast(Types.DOUBLE) as 'rule_weight_sum, '''.toString as 'rule_is_one_ticket_veto, '''.toString as 'deducting_reason, '''.toString as 'repair_method, update('evaluation_time_millis, 'evaluation_time_millis).cast(Types.LONG) as 'evaluation_time_millis) // 将二级规则风险动态表转换成二级规则流 val ruleRiskAssessmentStream = ruleRiskAssessmentTable.toRetractStream[RuleRiskAssessment] .filter(_._1) .map{_._2} .map(x => { // 封装二级规则风险评估数据 Map("stream_rule_id" -> x.ruleId, "stream_rule_risk" -> x.ruleRisk.toString, "stream_evaluation_time" -> x.evaluationTime, "stream_area_code" -> x.areaCode, "stream_area_name" -> x.areaName, "stream_org_code" -> x.orgCode, "stream_org_name" -> x.orgName, "stream_dev_only_id" -> x.devOnlyId, "stream_ip" -> x.ip, "stream_deducting_reason" -> x.deductingReason, "stream_repair_method" -> x.repairMethod, "stream_evaluation_time_millis" -> x.evaluationTimeMillis.toString) }) Jark Wu <[hidden email]> 于2019年8月26日周一 下午12:18写道: > Hi Qi, > > 你在checkpoint 恢复任务前,有修改过 UDAF 的 ACC 结构吗? > > 另外,如果可以的话,最好也能带上 UDAF 的实现,和 SQL。 > > Best, > Jark > > > 在 2019年8月23日,11:08,orlando qi <[hidden email]> 写道: > > > > > > at > > -- 祁豪兵 |
Free forum by Nabble | Edit this page |