为何会报"Window can only be defined over a time attribute column."??

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

为何会报"Window can only be defined over a time attribute column."??

Chennet Steven

Flink 1.4.2

执行Sql SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag
会报异常,但是感觉UptMs列类型是TimeStamp啊?
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
         at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:69)
         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:82)
         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:87)
         at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:506)
         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:385)
         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
         at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:197)
         at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:257)
         at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:665)
         at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
         at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
         at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
         at com.teld.bdp.tablewindow.TubleApp$.main(TubleApp.scala:56)
         at com.teld.bdp.tablewindow.TubleApp.main(TubleApp.scala)
Disconnected from the target VM, address: '127.0.0.1:60155', transport: 'socket'


代码如下


package com.teld.bdp.tablewindow

import java.io.Serializable

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.Types
import org.apache.flink.api.java.tuple.Tuple3
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

import org.apache.flink.table.api.TableEnvironment

// Flink 1.4.2
object TubleApp {
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.createLocalEnvironment(1)
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tableEnv = TableEnvironment.getTableEnvironment(senv)
    tableEnv.registerFunction("TOTIMESTAMP", new ConvertLongToTimeStamp())

    // 2019-06-20 10:27:57 1560997677000
    // 2019-06-20 11:27:57 1561001277000
    // 2019-06-20 12:27:57 1561004877000
    val source = senv.fromElements(
      new Tuple3("a", 1, 1560997677000L),
      new Tuple3("b", 1, 1560997677000L),

      new Tuple3("a", 2, 1561001277000L),
      new Tuple3("b", 2, 1561001277000L),

      new Tuple3("a", 3, 1561004877000L),
      new Tuple3("b", 3, 1561004877000L)
    )

    import org.apache.flink.table.api.scala._
    tableEnv.registerDataStream("Temp", source, 'Tag, 'PowerX, 'UptTime)

    //-----------------------------源Table-----------------------
    val sourceTable = tableEnv.sqlQuery("select * from Temp")
    // root
    // |-- Tag: String
    // |-- PowerX: Integer
    // |-- UptTime: Long
    sourceTable.printSchema()
    tableEnv.registerTable("SourceTable", sourceTable)

    //-----------------------------转换成TimeStamp-----------------------
    val timeTable = tableEnv.sqlQuery("SELECT Tag, PowerX, UptTime, TOTIMESTAMP(UptTime) AS UptMs FROM SourceTable")
    // root
    // |-- Tag: String
    // |-- PowerX: Integer
    // |-- UptTime: Long
    // |-- UptMs: Timestamp
    timeTable.printSchema()
    tableEnv.registerTable("TimeTable", timeTable)
    tableEnv.toAppendStream[Row](timeTable).print()

    //------------------------------agg-------------------------------------
    val aggTable = tableEnv.sqlQuery("SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag")
    // root
    // |-- Tag: String
    // |-- EXPR$1: Integer
    aggTable.printSchema()
    // 为啥这下面这个聚合的table会抛异常啊?
    // Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
    tableEnv.toRetractStream[Row](aggTable).print()


    senv.execute("abc")
  }
}

class ConvertLongToTimeStamp extends ScalarFunction with Serializable {
  def eval(uptTime: Long): Long = uptTime

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.SQL_TIMESTAMP
}



From stevenchen
         webchat 38798579

Reply | Threaded
Open this post in threaded view
|

答复: 为何会报"Window can only be defined over a time attribute column."??

Chennet Steven
貌似问题变成这样子

程序流程如下:WM=watermark
source(没有WM)-->转换A-->TableA(没有WM)--->转换B--->TableB(没有WM)---->转换C(sql TUMBLE)-->TableC-->Sink
为了让转换C的Sql能够成功执行,如何在TableB上Assign一个Watermark??


From stevenchen
         webchat 38798579

________________________________
发件人: Chennet Steven
发送时间: Friday, June 21, 2019 11:34:25 AM
收件人: [hidden email]
主题: 为何会报"Window can only be defined over a time attribute column."??


Flink 1.4.2

执行Sql SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag
会报异常,但是感觉UptMs列类型是TimeStamp啊?
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
         at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:69)
         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:82)
         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:87)
         at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:506)
         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:385)
         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
         at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:197)
         at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:257)
         at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:665)
         at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
         at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
         at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
         at com.teld.bdp.tablewindow.TubleApp$.main(TubleApp.scala:56)
         at com.teld.bdp.tablewindow.TubleApp.main(TubleApp.scala)
Disconnected from the target VM, address: '127.0.0.1:60155', transport: 'socket'


代码如下


package com.teld.bdp.tablewindow

import java.io.Serializable

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.Types
import org.apache.flink.api.java.tuple.Tuple3
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

import org.apache.flink.table.api.TableEnvironment

// Flink 1.4.2
object TubleApp {
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.createLocalEnvironment(1)
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tableEnv = TableEnvironment.getTableEnvironment(senv)
    tableEnv.registerFunction("TOTIMESTAMP", new ConvertLongToTimeStamp())

    // 2019-06-20 10:27:57 1560997677000
    // 2019-06-20 11:27:57 1561001277000
    // 2019-06-20 12:27:57 1561004877000
    val source = senv.fromElements(
      new Tuple3("a", 1, 1560997677000L),
      new Tuple3("b", 1, 1560997677000L),

      new Tuple3("a", 2, 1561001277000L),
      new Tuple3("b", 2, 1561001277000L),

      new Tuple3("a", 3, 1561004877000L),
      new Tuple3("b", 3, 1561004877000L)
    )

    import org.apache.flink.table.api.scala._
    tableEnv.registerDataStream("Temp", source, 'Tag, 'PowerX, 'UptTime)

    //-----------------------------源Table-----------------------
    val sourceTable = tableEnv.sqlQuery("select * from Temp")
    // root
    // |-- Tag: String
    // |-- PowerX: Integer
    // |-- UptTime: Long
    sourceTable.printSchema()
    tableEnv.registerTable("SourceTable", sourceTable)

    //-----------------------------转换成TimeStamp-----------------------
    val timeTable = tableEnv.sqlQuery("SELECT Tag, PowerX, UptTime, TOTIMESTAMP(UptTime) AS UptMs FROM SourceTable")
    // root
    // |-- Tag: String
    // |-- PowerX: Integer
    // |-- UptTime: Long
    // |-- UptMs: Timestamp
    timeTable.printSchema()
    tableEnv.registerTable("TimeTable", timeTable)
    tableEnv.toAppendStream[Row](timeTable).print()

    //------------------------------agg-------------------------------------
    val aggTable = tableEnv.sqlQuery("SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag")
    // root
    // |-- Tag: String
    // |-- EXPR$1: Integer
    aggTable.printSchema()
    // 为啥这下面这个聚合的table会抛异常啊?
    // Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
    tableEnv.toRetractStream[Row](aggTable).print()


    senv.execute("abc")
  }
}

class ConvertLongToTimeStamp extends ScalarFunction with Serializable {
  def eval(uptTime: Long): Long = uptTime

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.SQL_TIMESTAMP
}



From stevenchen
         webchat 38798579

Reply | Threaded
Open this post in threaded view
|

Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Chennet Steven
尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
如何在聚合函数中使用State?


import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.functions.{AggregateFunction, FunctionContext}
import java.lang.{Iterable => JIterable}


class IntDiffSumAccumulator extends JTuple2[Int, Boolean]

class IntDiffSumFunction extends AggregateFunction[Int, IntDiffSumAccumulator] {

  override def open(context: FunctionContext): Unit = {
    // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
    //getRuntimeContext.getState(desc)
    val a = this.hashCode()
    print(s"hashCode:$a")
    super.open(context)
  }

  override def createAccumulator(): IntDiffSumAccumulator = {
    val acc = new IntDiffSumAccumulator()
    acc.f0 = 0
    acc.f1 = false
    acc
  }

  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
    accumulator.f0 += value
    accumulator.f1 = true
  }

  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
    if (accumulator.f1) {

      accumulator.f0
    } else {
      Int.MinValue
    }
  }

  def merge(acc: IntDiffSumAccumulator, its: JIterable[IntDiffSumAccumulator]) = {
    val iter = its.iterator()
    while (true) {
      val a = iter.next()
      if (a.f1) {
        acc.f0 += a.f0
        acc.f1 = true
      }
    }
  }

  def resetAccumulator(acc: IntDiffSumAccumulator) = {
    acc.f0 = 0
    acc.f1 = false
  }

  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
    new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO)
}


From stevenchen
         webchat 38798579


Reply | Threaded
Open this post in threaded view
|

Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Wenlong Lyu
可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。

On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote:

> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
> 如何在聚合函数中使用State?
>
>
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.functions.{AggregateFunction,
> FunctionContext}
> import java.lang.{Iterable => JIterable}
>
>
> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>
> class IntDiffSumFunction extends AggregateFunction[Int,
> IntDiffSumAccumulator] {
>
>   override def open(context: FunctionContext): Unit = {
>     // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>     //getRuntimeContext.getState(desc)
>     val a = this.hashCode()
>     print(s"hashCode:$a")
>     super.open(context)
>   }
>
>   override def createAccumulator(): IntDiffSumAccumulator = {
>     val acc = new IntDiffSumAccumulator()
>     acc.f0 = 0
>     acc.f1 = false
>     acc
>   }
>
>   def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>     accumulator.f0 += value
>     accumulator.f1 = true
>   }
>
>   override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>     if (accumulator.f1) {
>
>       accumulator.f0
>     } else {
>       Int.MinValue
>     }
>   }
>
>   def merge(acc: IntDiffSumAccumulator, its:
> JIterable[IntDiffSumAccumulator]) = {
>     val iter = its.iterator()
>     while (true) {
>       val a = iter.next()
>       if (a.f1) {
>         acc.f0 += a.f0
>         acc.f1 = true
>       }
>     }
>   }
>
>   def resetAccumulator(acc: IntDiffSumAccumulator) = {
>     acc.f0 = 0
>     acc.f1 = false
>   }
>
>   override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>     new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO)
> }
>
>
> From stevenchen
>          webchat 38798579
>
>
>
Reply | Threaded
Open this post in threaded view
|

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Chennet Steven
在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
能否给个example或者是test代码的链接啊?

From stevenchen
         webchat 38798579

________________________________
发件人: wenlong.lwl <[hidden email]>
发送时间: Thursday, November 7, 2019 2:13:43 PM
收件人: [hidden email] <[hidden email]>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。

On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote:

> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
> 如何在聚合函数中使用State?
>
>
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.functions.{AggregateFunction,
> FunctionContext}
> import java.lang.{Iterable => JIterable}
>
>
> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>
> class IntDiffSumFunction extends AggregateFunction[Int,
> IntDiffSumAccumulator] {
>
>   override def open(context: FunctionContext): Unit = {
>     // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>     //getRuntimeContext.getState(desc)
>     val a = this.hashCode()
>     print(s"hashCode:$a")
>     super.open(context)
>   }
>
>   override def createAccumulator(): IntDiffSumAccumulator = {
>     val acc = new IntDiffSumAccumulator()
>     acc.f0 = 0
>     acc.f1 = false
>     acc
>   }
>
>   def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>     accumulator.f0 += value
>     accumulator.f1 = true
>   }
>
>   override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>     if (accumulator.f1) {
>
>       accumulator.f0
>     } else {
>       Int.MinValue
>     }
>   }
>
>   def merge(acc: IntDiffSumAccumulator, its:
> JIterable[IntDiffSumAccumulator]) = {
>     val iter = its.iterator()
>     while (true) {
>       val a = iter.next()
>       if (a.f1) {
>         acc.f0 += a.f0
>         acc.f1 = true
>       }
>     }
>   }
>
>   def resetAccumulator(acc: IntDiffSumAccumulator) = {
>     acc.f0 = 0
>     acc.f1 = false
>   }
>
>   override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>     new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO)
> }
>
>
> From stevenchen
>          webchat 38798579
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Dian Fu
可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>

> 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
>         webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <[hidden email]>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: [hidden email] <[hidden email]>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>>  override def open(context: FunctionContext): Unit = {
>>    // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>    //getRuntimeContext.getState(desc)
>>    val a = this.hashCode()
>>    print(s"hashCode:$a")
>>    super.open(context)
>>  }
>>
>>  override def createAccumulator(): IntDiffSumAccumulator = {
>>    val acc = new IntDiffSumAccumulator()
>>    acc.f0 = 0
>>    acc.f1 = false
>>    acc
>>  }
>>
>>  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>    accumulator.f0 += value
>>    accumulator.f1 = true
>>  }
>>
>>  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>    if (accumulator.f1) {
>>
>>      accumulator.f0
>>    } else {
>>      Int.MinValue
>>    }
>>  }
>>
>>  def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>>    val iter = its.iterator()
>>    while (true) {
>>      val a = iter.next()
>>      if (a.f1) {
>>        acc.f0 += a.f0
>>        acc.f1 = true
>>      }
>>    }
>>  }
>>
>>  def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>    acc.f0 = 0
>>    acc.f1 = false
>>  }
>>
>>  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>    new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>>         webchat 38798579
>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Chennet Steven
场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?

From stevenchen
         webchat 38798579

发件人: Dian Fu<mailto:[hidden email]>
发送时间: Thursday, November 7, 2019 19:41
收件人: [hidden email]<mailto:[hidden email]>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>

> 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
>         webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <[hidden email]>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: [hidden email] <[hidden email]>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>>  override def open(context: FunctionContext): Unit = {
>>    // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>    //getRuntimeContext.getState(desc)
>>    val a = this.hashCode()
>>    print(s"hashCode:$a")
>>    super.open(context)
>>  }
>>
>>  override def createAccumulator(): IntDiffSumAccumulator = {
>>    val acc = new IntDiffSumAccumulator()
>>    acc.f0 = 0
>>    acc.f1 = false
>>    acc
>>  }
>>
>>  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>    accumulator.f0 += value
>>    accumulator.f1 = true
>>  }
>>
>>  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>    if (accumulator.f1) {
>>
>>      accumulator.f0
>>    } else {
>>      Int.MinValue
>>    }
>>  }
>>
>>  def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>>    val iter = its.iterator()
>>    while (true) {
>>      val a = iter.next()
>>      if (a.f1) {
>>        acc.f0 += a.f0
>>        acc.f1 = true
>>      }
>>    }
>>  }
>>
>>  def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>    acc.f0 = 0
>>    acc.f1 = false
>>  }
>>
>>  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>    new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>>         webchat 38798579
>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Dian Fu
1)在Table API & SQL中,RuntimeContext是不暴露给用户用的,所以是private
2)窗口之间聚合值的差值,可以看看cep能否满足需求,可以参考文档: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/match_recognize.html <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/match_recognize.html>

> 在 2019年11月13日,下午3:35,Chennet Steven <[hidden email]> 写道:
>
> 场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
> 请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?
>
> From stevenchen
>         webchat 38798579
>
> 发件人: Dian Fu<mailto:[hidden email]>
> 发送时间: Thursday, November 7, 2019 19:41
> 收件人: [hidden email]<mailto:[hidden email]>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
>> 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道:
>>
>> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
>> 能否给个example或者是test代码的链接啊?
>>
>> From stevenchen
>>        webchat 38798579
>>
>> ________________________________
>> 发件人: wenlong.lwl <[hidden email]>
>> 发送时间: Thursday, November 7, 2019 2:13:43 PM
>> 收件人: [hidden email] <[hidden email]>
>> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>>
>> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>>
>> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote:
>>
>>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>>> 如何在聚合函数中使用State?
>>>
>>>
>>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>>> TypeInformation}
>>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>>> import org.apache.flink.table.functions.{AggregateFunction,
>>> FunctionContext}
>>> import java.lang.{Iterable => JIterable}
>>>
>>>
>>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>>
>>> class IntDiffSumFunction extends AggregateFunction[Int,
>>> IntDiffSumAccumulator] {
>>>
>>> override def open(context: FunctionContext): Unit = {
>>>   // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>>   //getRuntimeContext.getState(desc)
>>>   val a = this.hashCode()
>>>   print(s"hashCode:$a")
>>>   super.open(context)
>>> }
>>>
>>> override def createAccumulator(): IntDiffSumAccumulator = {
>>>   val acc = new IntDiffSumAccumulator()
>>>   acc.f0 = 0
>>>   acc.f1 = false
>>>   acc
>>> }
>>>
>>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>>   accumulator.f0 += value
>>>   accumulator.f1 = true
>>> }
>>>
>>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>>   if (accumulator.f1) {
>>>
>>>     accumulator.f0
>>>   } else {
>>>     Int.MinValue
>>>   }
>>> }
>>>
>>> def merge(acc: IntDiffSumAccumulator, its:
>>> JIterable[IntDiffSumAccumulator]) = {
>>>   val iter = its.iterator()
>>>   while (true) {
>>>     val a = iter.next()
>>>     if (a.f1) {
>>>       acc.f0 += a.f0
>>>       acc.f1 = true
>>>     }
>>>   }
>>> }
>>>
>>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>>   acc.f0 = 0
>>>   acc.f1 = false
>>> }
>>>
>>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>>   new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>>> }
>>>
>>>
>>> From stevenchen
>>>        webchat 38798579
>>>
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Yuan,Youjun
In reply to this post by Chennet Steven
这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature
FROM (
        SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature
        FROM (
                SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid
                )
        )

我用如下测试数据:
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
    "注释":{
        "说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
        "输入示例": "1000,dev1,2.3",
        "输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
    },
    "sources": [{
        "schema": {
            "format": "CSV",
            "fields": [{
                    "name": "ts",
                    "type": "SQL_TIMESTAMP"
                },
                {
                    "name": "deviceid",
                    "type": "STRING"
                },
                {
                    "name": "temp",
                    "type": "DOUBLE"
                }]
        },
        "watermark": 0,
        "name": "mysrc",
        "eventTime": "ts",
        "type": "COLLECTION",
        "attr": {
            "input": [
                "10000,dev1,1.1",
                "20000,dev1,1.2",
                "50000,dev1,1.3",
                "60000,dev1,1.4",
                "100000,dev1,1.5",
                "110000,dev1,1.6",
                "120000,dev1,1.7"
            ]
        }
    }],
    "sink": {
        "schema": {
            "format": "JSON"
        },
        "name": "mysink",
        "type": "STDOUT"
    },
    "name": "demojob",
    "timeType": "EVENTTIME",
    "sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) "
}



-----邮件原件-----
发件人: Chennet Steven <[hidden email]>
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: [hidden email]
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?

From stevenchen
         webchat 38798579

发件人: Dian Fu<mailto:[hidden email]>
发送时间: Thursday, November 7, 2019 19:41
收件人: [hidden email]<mailto:[hidden email]>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>

> 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如
> 何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
>         webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <[hidden email]>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: [hidden email] <[hidden email]>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex
>> t
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import
>> org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>>  override def open(context: FunctionContext): Unit = {
>>    // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>    //getRuntimeContext.getState(desc)
>>    val a = this.hashCode()
>>    print(s"hashCode:$a")
>>    super.open(context)
>>  }
>>
>>  override def createAccumulator(): IntDiffSumAccumulator = {
>>    val acc = new IntDiffSumAccumulator()
>>    acc.f0 = 0
>>    acc.f1 = false
>>    acc
>>  }
>>
>>  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>    accumulator.f0 += value
>>    accumulator.f1 = true
>>  }
>>
>>  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>    if (accumulator.f1) {
>>
>>      accumulator.f0
>>    } else {
>>      Int.MinValue
>>    }
>>  }
>>
>>  def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>>    val iter = its.iterator()
>>    while (true) {
>>      val a = iter.next()
>>      if (a.f1) {
>>        acc.f0 += a.f0
>>        acc.f1 = true
>>      }
>>    }
>>  }
>>
>>  def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>    acc.f0 = 0
>>    acc.f1 = false
>>  }
>>
>>  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>    new
>> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>>         webchat 38798579
>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Chennet Steven
Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null?

运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}     ---- 这分钟的1.3 是否能有方法设置为null?
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}


From stevenchen
         webchat 38798579

________________________________
发件人: Yuan,Youjun <[hidden email]>
发送时间: Wednesday, November 13, 2019 11:34:53 PM
收件人: [hidden email] <[hidden email]>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature
FROM (
        SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature
        FROM (
                SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid
                )
        )

我用如下测试数据:
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
    "注释":{
        "说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
        "输入示例": "1000,dev1,2.3",
        "输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
    },
    "sources": [{
        "schema": {
            "format": "CSV",
            "fields": [{
                    "name": "ts",
                    "type": "SQL_TIMESTAMP"
                },
                {
                    "name": "deviceid",
                    "type": "STRING"
                },
                {
                    "name": "temp",
                    "type": "DOUBLE"
                }]
        },
        "watermark": 0,
        "name": "mysrc",
        "eventTime": "ts",
        "type": "COLLECTION",
        "attr": {
            "input": [
                "10000,dev1,1.1",
                "20000,dev1,1.2",
                "50000,dev1,1.3",
                "60000,dev1,1.4",
                "100000,dev1,1.5",
                "110000,dev1,1.6",
                "120000,dev1,1.7"
            ]
        }
    }],
    "sink": {
        "schema": {
            "format": "JSON"
        },
        "name": "mysink",
        "type": "STDOUT"
    },
    "name": "demojob",
    "timeType": "EVENTTIME",
    "sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) "
}



-----邮件原件-----
发件人: Chennet Steven <[hidden email]>
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: [hidden email]
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?

From stevenchen
         webchat 38798579

发件人: Dian Fu<mailto:[hidden email]>
发送时间: Thursday, November 7, 2019 19:41
收件人: [hidden email]<mailto:[hidden email]>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>

> 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如
> 何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
>         webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <[hidden email]>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: [hidden email] <[hidden email]>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex
>> t
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import
>> org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>>  override def open(context: FunctionContext): Unit = {
>>    // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>    //getRuntimeContext.getState(desc)
>>    val a = this.hashCode()
>>    print(s"hashCode:$a")
>>    super.open(context)
>>  }
>>
>>  override def createAccumulator(): IntDiffSumAccumulator = {
>>    val acc = new IntDiffSumAccumulator()
>>    acc.f0 = 0
>>    acc.f1 = false
>>    acc
>>  }
>>
>>  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>    accumulator.f0 += value
>>    accumulator.f1 = true
>>  }
>>
>>  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>    if (accumulator.f1) {
>>
>>      accumulator.f0
>>    } else {
>>      Int.MinValue
>>    }
>>  }
>>
>>  def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>>    val iter = its.iterator()
>>    while (true) {
>>      val a = iter.next()
>>      if (a.f1) {
>>        acc.f0 += a.f0
>>        acc.f1 = true
>>      }
>>    }
>>  }
>>
>>  def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>    acc.f0 = 0
>>    acc.f1 = false
>>  }
>>
>>  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>    new
>> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>>         webchat 38798579
>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Yuan,Youjun
SQL没有表达这种“最早一分钟”的逻辑。
如果在你的消息的开头,插入一个temperature=0的消息,那么你得到的第一个输出diff_temperature=0,不知道这种方式是否可以接受。

发件人: Chennet Steven <[hidden email]>
发送时间: Thursday, November 14, 2019 5:32 PM
收件人: [hidden email]; Yuan,Youjun <[hidden email]>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null?

运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}     ---- 这分钟的1.3 是否能有方法设置为null?
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}


From stevenchen
         webchat 38798579

________________________________
发件人: Yuan,Youjun <[hidden email]<mailto:[hidden email]>>
发送时间: Wednesday, November 13, 2019 11:34:53 PM
收件人: [hidden email]<mailto:[hidden email]> <[hidden email]<mailto:[hidden email]>>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature
FROM (
        SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature
        FROM (
                SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid
                )
        )

我用如下测试数据:
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
    "注释":{
        "说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
        "输入示例": "1000,dev1,2.3",
        "输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
    },
    "sources": [{
        "schema": {
            "format": "CSV",
            "fields": [{
                    "name": "ts",
                    "type": "SQL_TIMESTAMP"
                },
                {
                    "name": "deviceid",
                    "type": "STRING"
                },
                {
                    "name": "temp",
                    "type": "DOUBLE"
                }]
        },
        "watermark": 0,
        "name": "mysrc",
        "eventTime": "ts",
        "type": "COLLECTION",
        "attr": {
            "input": [
                "10000,dev1,1.1",
                "20000,dev1,1.2",
                "50000,dev1,1.3",
                "60000,dev1,1.4",
                "100000,dev1,1.5",
                "110000,dev1,1.6",
                "120000,dev1,1.7"
            ]
        }
    }],
    "sink": {
        "schema": {
            "format": "JSON"
        },
        "name": "mysink",
        "type": "STDOUT"
    },
    "name": "demojob",
    "timeType": "EVENTTIME",
    "sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) "
}



-----邮件原件-----
发件人: Chennet Steven <[hidden email]<mailto:[hidden email]>>
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: [hidden email]<mailto:[hidden email]>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?

From stevenchen
         webchat 38798579

发件人: Dian Fu<mailto:[hidden email]>
发送时间: Thursday, November 7, 2019 19:41
收件人: [hidden email]<mailto:[hidden email]<mailto:[hidden email]%3cmailto:[hidden email]>>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>

> 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]<mailto:[hidden email]>> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如
> 何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
>         webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <[hidden email]<mailto:[hidden email]>>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: [hidden email]<mailto:[hidden email]> <[hidden email]<mailto:[hidden email]>>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]<mailto:[hidden email]>> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex
>> t
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import
>> org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>>  override def open(context: FunctionContext): Unit = {
>>    // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>    //getRuntimeContext.getState(desc)
>>    val a = this.hashCode()
>>    print(s"hashCode:$a")
>>    super.open(context)
>>  }
>>
>>  override def createAccumulator(): IntDiffSumAccumulator = {
>>    val acc = new IntDiffSumAccumulator()
>>    acc.f0 = 0
>>    acc.f1 = false
>>    acc
>>  }
>>
>>  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>    accumulator.f0 += value
>>    accumulator.f1 = true
>>  }
>>
>>  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>    if (accumulator.f1) {
>>
>>      accumulator.f0
>>    } else {
>>      Int.MinValue
>>    }
>>  }
>>
>>  def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>>    val iter = its.iterator()
>>    while (true) {
>>      val a = iter.next()
>>      if (a.f1) {
>>        acc.f0 += a.f0
>>        acc.f1 = true
>>      }
>>    }
>>  }
>>
>>  def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>    acc.f0 = 0
>>    acc.f1 = false
>>  }
>>
>>  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>    new
>> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>>         webchat 38798579
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Flink1.9.1,TableApi如何读取Kafka08Json的数据

Chennet Steven
刚从1.7升级到1.9,感觉kafka的读取方式有了变化,没找到example关于1.9读取kafka的example,谁能给个demo的地址啊?

下面这个代码在1.9下没有跑通过,提示
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'UptTimeMs' could not be resolved by the field mapping.
         at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
         at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
         at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
         at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
         at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
         at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
         at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
         at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
         at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
         at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204)
         at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435)
         at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200)
         at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70)
         at com.teld.demo.Kafka08App.FunA(Kafka08App.java:69)
         at com.teld.demo.Kafka08App.main(Kafka08App.java:23)


代码如下


private static void FunA() throws Exception {
    Configuration localConfig = new Configuration();
    localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment ste = StreamTableEnvironment.create(env);

    Kafka kafka08 = new Kafka()
            .version("0.8")
            .topic("BDP-ChargingMinuteMetric")
            .startFromEarliest()
            .property("zookeeper.connect", "hdpjntest.chinacloudapp.cn:2182/kafka08")
            .property("bootstrap.servers", "telddruidteal.chinacloudapp.cn:9095")
            .property("group.id", "abc");


    Schema schema = new Schema()
            .field("UptTimeMs", Types.SQL_TIMESTAMP)
            .rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000))
            .field("BillId", Types.STRING)
            .field("SOC", Types.DOUBLE)
            .field("HighestVoltage", Types.DOUBLE);


    TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE};
    String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"};
    TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames);
    FormatDescriptor formatDescriptor = new Json().failOnMissingField(false).schema(typeInformation).deriveSchema();

    ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable");
    Table table = ste.sqlQuery("select * from SourceTable");

    DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class);
    rowDataStream.print();

    ste.execute("ABC");
}



From stevenchen
         webchat 38798579


Reply | Threaded
Open this post in threaded view
|

回复: Flink1.9.1,TableApi如何读取Kafka08Json的数据

Chennet Steven
Kafka的一个value如下
{
  "BillID": "230c95c6-346c-4070-9b49-b3bbbf6691db",
  "BillCode": "201912230300118165",
  "UptTimeMs": 1577091480000,
  "SOC": 0.86,
  "HighestVoltage": 4.019999980926514
}

其中  "UptTimeMs": 1577091480000 是到1970的毫秒值

代码修改如下
private static void FunA() throws Exception {
    Configuration localConfig = new Configuration();
    localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment ste = StreamTableEnvironment.create(env);

    Kafka kafka08 = new Kafka()
            .version("0.8")
            .topic("BDP-ChargingMinuteMetric")
            .startFromEarliest()
            .property("zookeeper.connect", "def:2182/ ")
            .property("bootstrap.servers", "abc:9095")
            .property("group.id", "abc");


    Schema schema = new Schema()
            .field("rowtime", Types.SQL_TIMESTAMP)
            .rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000))
            .field("BillID", Types.STRING)
            .field("SOC", Types.DOUBLE)
            .field("HighestVoltage", Types.DOUBLE);


    TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE};
    String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"};
    TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames);
    FormatDescriptor formatDescriptor = new Json().failOnMissingField(true).schema(typeInformation).deriveSchema();

    ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable");
    Table table = ste.sqlQuery("select * from SourceTable");

    DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class);
    rowDataStream.print();

    ste.execute("ABC");
}

提示错误
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
         at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
         at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
         at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
         at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
         at org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410)
         at com.teld.demo.Kafka08App.FunA(Kafka08App.java:75)
         at com.teld.demo.Kafka08App.main(Kafka08App.java:23)
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON object.
         at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
         at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
         at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
         at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to deserialize JSON object.
         at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
         at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
         at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
         at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:374)
Caused by: java.time.format.DateTimeParseException: Text '1577085840000' could not be parsed at index 0
         at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
         at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
         at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
         at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
         at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
         at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
         at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
         at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)

查看源代码发现:
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
发现rowtime的必须是Sql_TimeStamp类型,而Sql_TimeStamp使用RFC3339_TIMESTAMP_FORMAT来解析,这需要一个String类型的字符串啊,如果是到1970的毫秒值,这个地方该如何做啊?




From stevenchen
         webchat 38798579

发件人: Chennet Steven<mailto:[hidden email]>
发送时间: Tuesday, December 24, 2019 15:53
收件人: [hidden email]<mailto:[hidden email]>
主题: Flink1.9.1,TableApi如何读取Kafka08Json的数据

刚从1.7升级到1.9,感觉kafka的读取方式有了变化,没找到example关于1.9读取kafka的example,谁能给个demo的地址啊?

下面这个代码在1.9下没有跑通过,提示
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'UptTimeMs' could not be resolved by the field mapping.
         at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
         at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
         at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
         at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
         at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
         at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
         at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
         at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
         at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
         at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204)
         at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435)
         at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200)
         at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70)
         at com.teld.demo.Kafka08App.FunA(Kafka08App.java:69)
         at com.teld.demo.Kafka08App.main(Kafka08App.java:23)


代码如下


private static void FunA() throws Exception {
    Configuration localConfig = new Configuration();
    localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment ste = StreamTableEnvironment.create(env);

    Kafka kafka08 = new Kafka()
            .version("0.8")
            .topic("BDP-ChargingMinuteMetric")
            .startFromEarliest()
            .property("zookeeper.connect", "abc:2182/kafka08")
            .property("bootstrap.servers", "def:9095")
            .property("group.id", "abc");


    Schema schema = new Schema()
            .field("UptTimeMs", Types.SQL_TIMESTAMP)
            .rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000))
            .field("BillId", Types.STRING)
            .field("SOC", Types.DOUBLE)
            .field("HighestVoltage", Types.DOUBLE);


    TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE};
    String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"};
    TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames);
    FormatDescriptor formatDescriptor = new Json().failOnMissingField(false).schema(typeInformation).deriveSchema();

    ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable");
    Table table = ste.sqlQuery("select * from SourceTable");

    DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class);
    rowDataStream.print();

    ste.execute("ABC");
}



From stevenchen
         webchat 38798579