flink sql hop使用udaf问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql hop使用udaf问题

bigdata
你好:
    flink1.10.1使用sql hop函数中使用udaf函数,报错如下,如何解决,已经有marge方法。
org.apache.flink.table.api.ValidationException: Function class 'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static
UDAF如下:
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
  //各分位值
  val percentile1 = 0.5
  val percentile2 = 0.75
  val percentile3 = 0.98
  val percentile4 = 0.99
  /**
    * 计算不同分位数对应的指标
    * @param accumulator 累加器
    * @return 不同分位数对应的统计值
    */
  override def getValue(accumulator: ListBuffer[Float]): String = {
    //窗口时间数据大小
    val length = accumulator.size
    var i1 = Math.round(length*percentile1).toInt
    if(i1==0) i1 = 1
    var i2 = Math.round(length*percentile2).toInt
    if(i2==0) i2 = 1
    var i3 = Math.round(length*percentile3).toInt
    if(i3==0) i3 = 1
    var i4 = Math.round(length*percentile4).toInt
    if(i4==0) i4 = 1
    val seq = accumulator.sorted
    //返回结果
    seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
  }
  override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()
  def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
    accumulator.append(i)
  }
  def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = {
    its.foreach(i => accumulator ++ i)
  }
Reply | Threaded
Open this post in threaded view
|

回复: flink sql hop使用udaf问题

Evan
你好,根据你的描述,应该是“merge”方法,你写的是“marge”,单词拼的不正确,修改一下再试试,希望能帮助到你。


Evan Cheng
2021年1月18日09:00:07



 
发件人: bigdata
发送时间: 2021-01-17 22:31
收件人: user-zh
主题: flink sql hop使用udaf问题
你好:
    flink1.10.1使用sql hop函数中使用udaf函数,报错如下,如何解决,已经有marge方法。
org.apache.flink.table.api.ValidationException: Function class 'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static
UDAF如下:
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
  //各分位值
  val percentile1 = 0.5
  val percentile2 = 0.75
  val percentile3 = 0.98
  val percentile4 = 0.99
  /**
    * 计算不同分位数对应的指标
    * @param accumulator 累加器
    * @return 不同分位数对应的统计值
    */
  override def getValue(accumulator: ListBuffer[Float]): String = {
    //窗口时间数据大小
    val length = accumulator.size
    var i1 = Math.round(length*percentile1).toInt
    if(i1==0) i1 = 1
    var i2 = Math.round(length*percentile2).toInt
    if(i2==0) i2 = 1
    var i3 = Math.round(length*percentile3).toInt
    if(i3==0) i3 = 1
    var i4 = Math.round(length*percentile4).toInt
    if(i4==0) i4 = 1
    val seq = accumulator.sorted
    //返回结果
    seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
  }
  override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()
  def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
    accumulator.append(i)
  }
  def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = {
    its.foreach(i => accumulator ++ i)
  }