|
你把你的代码关键信息打码后贴出来让大家看看,现有信息看不出来,不要粘贴图片,图片看不到
祝好! 发件人: bigdata 发送时间: 2021-01-18 14:52 收件人: user-zh 主题: flink sql hop函数使用udaf,跟踪源码对merge方法进行参数校验时问题 你好: flink1.10.1 sql在使用hop后并将udaf中merge方法名修改正确后,报如下错:为什么会说找不到匹配的merge方法呢 org.apache.flink.table.planner.codegen.CodeGenException: No matching merge method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF' 跟踪merge 参数校验源码思路: 1、ImperativeAggCodeGen类的checkNeededMethods方法中if (needMerge) getUserDefinedMethod 2、UserDefinedFunctionUtils类的getUserDefinedMethod方法中,当校验merge第二个参数时(代码详见下面merge)为false,进而导致报错,如何解决?还是说这是flink的一个bug parameterClassEquals(methodSignature(i), clazz) || parameterDataTypeEquals(internalTypes(i), dataTypes(i)) 代码如下: def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = { its.foreach(i => accumulator ++ i) } |
In reply to this post by bigdata
代码如下:
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{ //各分位值 val percentile1 = 0.5 val percentile2 = 0.75 val percentile3 = 0.98 val percentile4 = 0.99 override def getValue(accumulator: ListBuffer[Float]): String = { //窗口时间数据大小 val length = accumulator.size var i1 = Math.round(length*percentile1).toInt if(i1==0) i1 = 1 var i2 = Math.round(length*percentile2).toInt if(i2==0) i2 = 1 var i3 = Math.round(length*percentile3).toInt if(i3==0) i3 = 1 var i4 = Math.round(length*percentile4).toInt if(i4==0) i4 = 1 val seq = accumulator.sorted //返回结果 seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt } override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]() def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = { accumulator.append(i) } def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = { its.foreach(i => accumulator ++ i) } ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年1月18日(星期一) 下午3:06 收件人: "user-zh"<[hidden email]>; 主题: 回复: flink sql hop函数使用udaf,跟踪源码对merge方法进行参数校验时问题 你把你的代码关键信息打码后贴出来让大家看看,现有信息看不出来,不要粘贴图片,图片看不到 祝好! 发件人: bigdata 发送时间: 2021-01-18 14:52 收件人: user-zh 主题: flink sql hop函数使用udaf,跟踪源码对merge方法进行参数校验时问题 你好: flink1.10.1 sql在使用hop后并将udaf中merge方法名修改正确后,报如下错:为什么会说找不到匹配的merge方法呢 org.apache.flink.table.planner.codegen.CodeGenException: No matching merge method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF' 跟踪merge 参数校验源码思路: 1、ImperativeAggCodeGen类的checkNeededMethods方法中if (needMerge) getUserDefinedMethod 2、UserDefinedFunctionUtils类的getUserDefinedMethod方法中,当校验merge第二个参数时(代码详见下面merge)为false,进而导致报错,如何解决?还是说这是flink的一个bug parameterClassEquals(methodSignature(i), clazz) || parameterDataTypeEquals(internalTypes(i), dataTypes(i)) 代码如下: def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = { its.foreach(i => accumulator ++ i) } |
Free forum by Nabble | Edit this page |