http://apache-flink.370.s1.nabble.com/Window-can-only-be-defined-over-a-time-attribute-column-tp13p982.html
> 在 2019年11月7日,下午7:06,Chennet Steven <
[hidden email]> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
> webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <
[hidden email]>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人:
[hidden email] <
[hidden email]>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <
[hidden email]> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>> override def open(context: FunctionContext): Unit = {
>> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>> //getRuntimeContext.getState(desc)
>> val a = this.hashCode()
>> print(s"hashCode:$a")
>> super.open(context)
>> }
>>
>> override def createAccumulator(): IntDiffSumAccumulator = {
>> val acc = new IntDiffSumAccumulator()
>> acc.f0 = 0
>> acc.f1 = false
>> acc
>> }
>>
>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>> accumulator.f0 += value
>> accumulator.f1 = true
>> }
>>
>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>> if (accumulator.f1) {
>>
>> accumulator.f0
>> } else {
>> Int.MinValue
>> }
>> }
>>
>> def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>> val iter = its.iterator()
>> while (true) {
>> val a = iter.next()
>> if (a.f1) {
>> acc.f0 += a.f0
>> acc.f1 = true
>> }
>> }
>> }
>>
>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>> acc.f0 = 0
>> acc.f1 = false
>> }
>>
>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>> webchat 38798579
>>
>>
>>