你好:
flink1.10.1使用sql hop函数中使用udaf函数,报错如下,如何解决,已经有marge方法。 org.apache.flink.table.api.ValidationException: Function class 'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static UDAF如下: class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{ //各分位值 val percentile1 = 0.5 val percentile2 = 0.75 val percentile3 = 0.98 val percentile4 = 0.99 /** * 计算不同分位数对应的指标 * @param accumulator 累加器 * @return 不同分位数对应的统计值 */ override def getValue(accumulator: ListBuffer[Float]): String = { //窗口时间数据大小 val length = accumulator.size var i1 = Math.round(length*percentile1).toInt if(i1==0) i1 = 1 var i2 = Math.round(length*percentile2).toInt if(i2==0) i2 = 1 var i3 = Math.round(length*percentile3).toInt if(i3==0) i3 = 1 var i4 = Math.round(length*percentile4).toInt if(i4==0) i4 = 1 val seq = accumulator.sorted //返回结果 seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt } override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]() def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = { accumulator.append(i) } def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = { its.foreach(i => accumulator ++ i) } |
你好,根据你的描述,应该是“merge”方法,你写的是“marge”,单词拼的不正确,修改一下再试试,希望能帮助到你。
Evan Cheng 2021年1月18日09:00:07 发件人: bigdata 发送时间: 2021-01-17 22:31 收件人: user-zh 主题: flink sql hop使用udaf问题 你好: flink1.10.1使用sql hop函数中使用udaf函数,报错如下,如何解决,已经有marge方法。 org.apache.flink.table.api.ValidationException: Function class 'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static UDAF如下: class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{ //各分位值 val percentile1 = 0.5 val percentile2 = 0.75 val percentile3 = 0.98 val percentile4 = 0.99 /** * 计算不同分位数对应的指标 * @param accumulator 累加器 * @return 不同分位数对应的统计值 */ override def getValue(accumulator: ListBuffer[Float]): String = { //窗口时间数据大小 val length = accumulator.size var i1 = Math.round(length*percentile1).toInt if(i1==0) i1 = 1 var i2 = Math.round(length*percentile2).toInt if(i2==0) i2 = 1 var i3 = Math.round(length*percentile3).toInt if(i3==0) i3 = 1 var i4 = Math.round(length*percentile4).toInt if(i4==0) i4 = 1 val seq = accumulator.sorted //返回结果 seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt } override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]() def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = { accumulator.append(i) } def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = { its.foreach(i => accumulator ++ i) } |
Free forum by Nabble | Edit this page |