Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by Chennet Steven on Nov 07, 2019; 1:22am
URL: http://apache-flink.370.s1.nabble.com/Window-can-only-be-defined-over-a-time-attribute-column-tp13p975.html
尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
如何在聚合函数中使用State?
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.functions.{AggregateFunction, FunctionContext}
import java.lang.{Iterable => JIterable}
class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
class IntDiffSumFunction extends AggregateFunction[Int, IntDiffSumAccumulator] {
override def open(context: FunctionContext): Unit = {
// Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
//getRuntimeContext.getState(desc)
val a = this.hashCode()
print(s"hashCode:$a")
super.open(context)
}
override def createAccumulator(): IntDiffSumAccumulator = {
val acc = new IntDiffSumAccumulator()
acc.f0 = 0
acc.f1 = false
acc
}
def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
accumulator.f0 += value
accumulator.f1 = true
}
override def getValue(accumulator: IntDiffSumAccumulator): Int = {
if (accumulator.f1) {
accumulator.f0
} else {
Int.MinValue
}
}
def merge(acc: IntDiffSumAccumulator, its: JIterable[IntDiffSumAccumulator]) = {
val iter = its.iterator()
while (true) {
val a = iter.next()
if (a.f1) {
acc.f0 += a.f0
acc.f1 = true
}
}
}
def resetAccumulator(acc: IntDiffSumAccumulator) = {
acc.f0 = 0
acc.f1 = false
}
override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO)
}
From stevenchen
webchat 38798579