回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by
Chennet Steven on
Nov 07, 2019; 11:06am
URL: http://apache-flink.370.s1.nabble.com/Window-can-only-be-defined-over-a-time-attribute-column-tp13p981.html
在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
能否给个example或者是test代码的链接啊?
From stevenchen
webchat 38798579
________________________________
发件人: wenlong.lwl <
[hidden email]>
发送时间: Thursday, November 7, 2019 2:13:43 PM
收件人:
[hidden email] <
[hidden email]>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
On Thu, 7 Nov 2019 at 09:22, Chennet Steven <
[hidden email]> wrote:
> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
> 如何在聚合函数中使用State?
>
>
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.functions.{AggregateFunction,
> FunctionContext}
> import java.lang.{Iterable => JIterable}
>
>
> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>
> class IntDiffSumFunction extends AggregateFunction[Int,
> IntDiffSumAccumulator] {
>
> override def open(context: FunctionContext): Unit = {
> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
> //getRuntimeContext.getState(desc)
> val a = this.hashCode()
> print(s"hashCode:$a")
> super.open(context)
> }
>
> override def createAccumulator(): IntDiffSumAccumulator = {
> val acc = new IntDiffSumAccumulator()
> acc.f0 = 0
> acc.f1 = false
> acc
> }
>
> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
> accumulator.f0 += value
> accumulator.f1 = true
> }
>
> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
> if (accumulator.f1) {
>
> accumulator.f0
> } else {
> Int.MinValue
> }
> }
>
> def merge(acc: IntDiffSumAccumulator, its:
> JIterable[IntDiffSumAccumulator]) = {
> val iter = its.iterator()
> while (true) {
> val a = iter.next()
> if (a.f1) {
> acc.f0 += a.f0
> acc.f1 = true
> }
> }
> }
>
> def resetAccumulator(acc: IntDiffSumAccumulator) = {
> acc.f0 = 0
> acc.f1 = false
> }
>
> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO)
> }
>
>
> From stevenchen
> webchat 38798579
>
>
>