Login  Register

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Posted by Chennet Steven on Nov 07, 2019; 11:06am
URL: http://apache-flink.370.s1.nabble.com/Window-can-only-be-defined-over-a-time-attribute-column-tp13p981.html

在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
能否给个example或者是test代码的链接啊?

From stevenchen
         webchat 38798579

________________________________
发件人: wenlong.lwl <[hidden email]>
发送时间: Thursday, November 7, 2019 2:13:43 PM
收件人: [hidden email] <[hidden email]>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。

On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote:

> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
> 如何在聚合函数中使用State?
>
>
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.functions.{AggregateFunction,
> FunctionContext}
> import java.lang.{Iterable => JIterable}
>
>
> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>
> class IntDiffSumFunction extends AggregateFunction[Int,
> IntDiffSumAccumulator] {
>
>   override def open(context: FunctionContext): Unit = {
>     // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>     //getRuntimeContext.getState(desc)
>     val a = this.hashCode()
>     print(s"hashCode:$a")
>     super.open(context)
>   }
>
>   override def createAccumulator(): IntDiffSumAccumulator = {
>     val acc = new IntDiffSumAccumulator()
>     acc.f0 = 0
>     acc.f1 = false
>     acc
>   }
>
>   def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>     accumulator.f0 += value
>     accumulator.f1 = true
>   }
>
>   override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>     if (accumulator.f1) {
>
>       accumulator.f0
>     } else {
>       Int.MinValue
>     }
>   }
>
>   def merge(acc: IntDiffSumAccumulator, its:
> JIterable[IntDiffSumAccumulator]) = {
>     val iter = its.iterator()
>     while (true) {
>       val a = iter.next()
>       if (a.f1) {
>         acc.f0 += a.f0
>         acc.f1 = true
>       }
>     }
>   }
>
>   def resetAccumulator(acc: IntDiffSumAccumulator) = {
>     acc.f0 = 0
>     acc.f1 = false
>   }
>
>   override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>     new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO)
> }
>
>
> From stevenchen
>          webchat 38798579
>
>
>