Flink 1.4.2 执行Sql SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag 会报异常,但是感觉UptMs列类型是TimeStamp啊? Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column. at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:69) at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:82) at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:87) at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:506) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:385) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:197) at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:257) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:665) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195) at com.teld.bdp.tablewindow.TubleApp$.main(TubleApp.scala:56) at com.teld.bdp.tablewindow.TubleApp.main(TubleApp.scala) Disconnected from the target VM, address: '127.0.0.1:60155', transport: 'socket' 代码如下 package com.teld.bdp.tablewindow import java.io.Serializable import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.Types import org.apache.flink.api.java.tuple.Tuple3 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row import org.apache.flink.table.api.TableEnvironment // Flink 1.4.2 object TubleApp { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.createLocalEnvironment(1) senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tableEnv = TableEnvironment.getTableEnvironment(senv) tableEnv.registerFunction("TOTIMESTAMP", new ConvertLongToTimeStamp()) // 2019-06-20 10:27:57 1560997677000 // 2019-06-20 11:27:57 1561001277000 // 2019-06-20 12:27:57 1561004877000 val source = senv.fromElements( new Tuple3("a", 1, 1560997677000L), new Tuple3("b", 1, 1560997677000L), new Tuple3("a", 2, 1561001277000L), new Tuple3("b", 2, 1561001277000L), new Tuple3("a", 3, 1561004877000L), new Tuple3("b", 3, 1561004877000L) ) import org.apache.flink.table.api.scala._ tableEnv.registerDataStream("Temp", source, 'Tag, 'PowerX, 'UptTime) //-----------------------------源Table----------------------- val sourceTable = tableEnv.sqlQuery("select * from Temp") // root // |-- Tag: String // |-- PowerX: Integer // |-- UptTime: Long sourceTable.printSchema() tableEnv.registerTable("SourceTable", sourceTable) //-----------------------------转换成TimeStamp----------------------- val timeTable = tableEnv.sqlQuery("SELECT Tag, PowerX, UptTime, TOTIMESTAMP(UptTime) AS UptMs FROM SourceTable") // root // |-- Tag: String // |-- PowerX: Integer // |-- UptTime: Long // |-- UptMs: Timestamp timeTable.printSchema() tableEnv.registerTable("TimeTable", timeTable) tableEnv.toAppendStream[Row](timeTable).print() //------------------------------agg------------------------------------- val aggTable = tableEnv.sqlQuery("SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag") // root // |-- Tag: String // |-- EXPR$1: Integer aggTable.printSchema() // 为啥这下面这个聚合的table会抛异常啊? // Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column. tableEnv.toRetractStream[Row](aggTable).print() senv.execute("abc") } } class ConvertLongToTimeStamp extends ScalarFunction with Serializable { def eval(uptTime: Long): Long = uptTime override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.SQL_TIMESTAMP } From stevenchen webchat 38798579 |
貌似问题变成这样子
程序流程如下:WM=watermark source(没有WM)-->转换A-->TableA(没有WM)--->转换B--->TableB(没有WM)---->转换C(sql TUMBLE)-->TableC-->Sink 为了让转换C的Sql能够成功执行,如何在TableB上Assign一个Watermark?? From stevenchen webchat 38798579 ________________________________ 发件人: Chennet Steven 发送时间: Friday, June 21, 2019 11:34:25 AM 收件人: [hidden email] 主题: 为何会报"Window can only be defined over a time attribute column."?? Flink 1.4.2 执行Sql SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag 会报异常,但是感觉UptMs列类型是TimeStamp啊? Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column. at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:69) at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:82) at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:87) at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:506) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:385) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:197) at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:257) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:665) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195) at com.teld.bdp.tablewindow.TubleApp$.main(TubleApp.scala:56) at com.teld.bdp.tablewindow.TubleApp.main(TubleApp.scala) Disconnected from the target VM, address: '127.0.0.1:60155', transport: 'socket' 代码如下 package com.teld.bdp.tablewindow import java.io.Serializable import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.Types import org.apache.flink.api.java.tuple.Tuple3 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row import org.apache.flink.table.api.TableEnvironment // Flink 1.4.2 object TubleApp { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.createLocalEnvironment(1) senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tableEnv = TableEnvironment.getTableEnvironment(senv) tableEnv.registerFunction("TOTIMESTAMP", new ConvertLongToTimeStamp()) // 2019-06-20 10:27:57 1560997677000 // 2019-06-20 11:27:57 1561001277000 // 2019-06-20 12:27:57 1561004877000 val source = senv.fromElements( new Tuple3("a", 1, 1560997677000L), new Tuple3("b", 1, 1560997677000L), new Tuple3("a", 2, 1561001277000L), new Tuple3("b", 2, 1561001277000L), new Tuple3("a", 3, 1561004877000L), new Tuple3("b", 3, 1561004877000L) ) import org.apache.flink.table.api.scala._ tableEnv.registerDataStream("Temp", source, 'Tag, 'PowerX, 'UptTime) //-----------------------------源Table----------------------- val sourceTable = tableEnv.sqlQuery("select * from Temp") // root // |-- Tag: String // |-- PowerX: Integer // |-- UptTime: Long sourceTable.printSchema() tableEnv.registerTable("SourceTable", sourceTable) //-----------------------------转换成TimeStamp----------------------- val timeTable = tableEnv.sqlQuery("SELECT Tag, PowerX, UptTime, TOTIMESTAMP(UptTime) AS UptMs FROM SourceTable") // root // |-- Tag: String // |-- PowerX: Integer // |-- UptTime: Long // |-- UptMs: Timestamp timeTable.printSchema() tableEnv.registerTable("TimeTable", timeTable) tableEnv.toAppendStream[Row](timeTable).print() //------------------------------agg------------------------------------- val aggTable = tableEnv.sqlQuery("SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag") // root // |-- Tag: String // |-- EXPR$1: Integer aggTable.printSchema() // 为啥这下面这个聚合的table会抛异常啊? // Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column. tableEnv.toRetractStream[Row](aggTable).print() senv.execute("abc") } } class ConvertLongToTimeStamp extends ScalarFunction with Serializable { def eval(uptTime: Long): Long = uptTime override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.SQL_TIMESTAMP } From stevenchen webchat 38798579 |
尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
如何在聚合函数中使用State? import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.table.functions.{AggregateFunction, FunctionContext} import java.lang.{Iterable => JIterable} class IntDiffSumAccumulator extends JTuple2[Int, Boolean] class IntDiffSumFunction extends AggregateFunction[Int, IntDiffSumAccumulator] { override def open(context: FunctionContext): Unit = { // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State //getRuntimeContext.getState(desc) val a = this.hashCode() print(s"hashCode:$a") super.open(context) } override def createAccumulator(): IntDiffSumAccumulator = { val acc = new IntDiffSumAccumulator() acc.f0 = 0 acc.f1 = false acc } def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { accumulator.f0 += value accumulator.f1 = true } override def getValue(accumulator: IntDiffSumAccumulator): Int = { if (accumulator.f1) { accumulator.f0 } else { Int.MinValue } } def merge(acc: IntDiffSumAccumulator, its: JIterable[IntDiffSumAccumulator]) = { val iter = its.iterator() while (true) { val a = iter.next() if (a.f1) { acc.f0 += a.f0 acc.f1 = true } } } def resetAccumulator(acc: IntDiffSumAccumulator) = { acc.f0 = 0 acc.f1 = false } override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO) } From stevenchen webchat 38798579 |
可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote: > 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext > 如何在聚合函数中使用State? > > > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, > TypeInformation} > import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} > import org.apache.flink.api.java.typeutils.TupleTypeInfo > import org.apache.flink.table.functions.{AggregateFunction, > FunctionContext} > import java.lang.{Iterable => JIterable} > > > class IntDiffSumAccumulator extends JTuple2[Int, Boolean] > > class IntDiffSumFunction extends AggregateFunction[Int, > IntDiffSumAccumulator] { > > override def open(context: FunctionContext): Unit = { > // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State > //getRuntimeContext.getState(desc) > val a = this.hashCode() > print(s"hashCode:$a") > super.open(context) > } > > override def createAccumulator(): IntDiffSumAccumulator = { > val acc = new IntDiffSumAccumulator() > acc.f0 = 0 > acc.f1 = false > acc > } > > def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { > accumulator.f0 += value > accumulator.f1 = true > } > > override def getValue(accumulator: IntDiffSumAccumulator): Int = { > if (accumulator.f1) { > > accumulator.f0 > } else { > Int.MinValue > } > } > > def merge(acc: IntDiffSumAccumulator, its: > JIterable[IntDiffSumAccumulator]) = { > val iter = its.iterator() > while (true) { > val a = iter.next() > if (a.f1) { > acc.f0 += a.f0 > acc.f1 = true > } > } > } > > def resetAccumulator(acc: IntDiffSumAccumulator) = { > acc.f0 = 0 > acc.f1 = false > } > > override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = > new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.BOOLEAN_TYPE_INFO) > } > > > From stevenchen > webchat 38798579 > > > |
在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
能否给个example或者是test代码的链接啊? From stevenchen webchat 38798579 ________________________________ 发件人: wenlong.lwl <[hidden email]> 发送时间: Thursday, November 7, 2019 2:13:43 PM 收件人: [hidden email] <[hidden email]> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。 On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote: > 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext > 如何在聚合函数中使用State? > > > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, > TypeInformation} > import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} > import org.apache.flink.api.java.typeutils.TupleTypeInfo > import org.apache.flink.table.functions.{AggregateFunction, > FunctionContext} > import java.lang.{Iterable => JIterable} > > > class IntDiffSumAccumulator extends JTuple2[Int, Boolean] > > class IntDiffSumFunction extends AggregateFunction[Int, > IntDiffSumAccumulator] { > > override def open(context: FunctionContext): Unit = { > // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State > //getRuntimeContext.getState(desc) > val a = this.hashCode() > print(s"hashCode:$a") > super.open(context) > } > > override def createAccumulator(): IntDiffSumAccumulator = { > val acc = new IntDiffSumAccumulator() > acc.f0 = 0 > acc.f1 = false > acc > } > > def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { > accumulator.f0 += value > accumulator.f1 = true > } > > override def getValue(accumulator: IntDiffSumAccumulator): Int = { > if (accumulator.f1) { > > accumulator.f0 > } else { > Int.MinValue > } > } > > def merge(acc: IntDiffSumAccumulator, its: > JIterable[IntDiffSumAccumulator]) = { > val iter = its.iterator() > while (true) { > val a = iter.next() > if (a.f1) { > acc.f0 += a.f0 > acc.f1 = true > } > } > } > > def resetAccumulator(acc: IntDiffSumAccumulator) = { > acc.f0 = 0 > acc.f1 = false > } > > override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = > new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.BOOLEAN_TYPE_INFO) > } > > > From stevenchen > webchat 38798579 > > > |
可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
> 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道: > > 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用, > 能否给个example或者是test代码的链接啊? > > From stevenchen > webchat 38798579 > > ________________________________ > 发件人: wenlong.lwl <[hidden email]> > 发送时间: Thursday, November 7, 2019 2:13:43 PM > 收件人: [hidden email] <[hidden email]> > 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 > > 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。 > > On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote: > >> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext >> 如何在聚合函数中使用State? >> >> >> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, >> TypeInformation} >> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} >> import org.apache.flink.api.java.typeutils.TupleTypeInfo >> import org.apache.flink.table.functions.{AggregateFunction, >> FunctionContext} >> import java.lang.{Iterable => JIterable} >> >> >> class IntDiffSumAccumulator extends JTuple2[Int, Boolean] >> >> class IntDiffSumFunction extends AggregateFunction[Int, >> IntDiffSumAccumulator] { >> >> override def open(context: FunctionContext): Unit = { >> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State >> //getRuntimeContext.getState(desc) >> val a = this.hashCode() >> print(s"hashCode:$a") >> super.open(context) >> } >> >> override def createAccumulator(): IntDiffSumAccumulator = { >> val acc = new IntDiffSumAccumulator() >> acc.f0 = 0 >> acc.f1 = false >> acc >> } >> >> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { >> accumulator.f0 += value >> accumulator.f1 = true >> } >> >> override def getValue(accumulator: IntDiffSumAccumulator): Int = { >> if (accumulator.f1) { >> >> accumulator.f0 >> } else { >> Int.MinValue >> } >> } >> >> def merge(acc: IntDiffSumAccumulator, its: >> JIterable[IntDiffSumAccumulator]) = { >> val iter = its.iterator() >> while (true) { >> val a = iter.next() >> if (a.f1) { >> acc.f0 += a.f0 >> acc.f1 = true >> } >> } >> } >> >> def resetAccumulator(acc: IntDiffSumAccumulator) = { >> acc.f0 = 0 >> acc.f1 = false >> } >> >> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = >> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, >> BasicTypeInfo.BOOLEAN_TYPE_INFO) >> } >> >> >> From stevenchen >> webchat 38798579 >> >> >> |
场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥? From stevenchen webchat 38798579 发件人: Dian Fu<mailto:[hidden email]> 发送时间: Thursday, November 7, 2019 19:41 收件人: [hidden email]<mailto:[hidden email]> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java> > 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道: > > 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用, > 能否给个example或者是test代码的链接啊? > > From stevenchen > webchat 38798579 > > ________________________________ > 发件人: wenlong.lwl <[hidden email]> > 发送时间: Thursday, November 7, 2019 2:13:43 PM > 收件人: [hidden email] <[hidden email]> > 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 > > 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。 > > On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote: > >> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext >> 如何在聚合函数中使用State? >> >> >> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, >> TypeInformation} >> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} >> import org.apache.flink.api.java.typeutils.TupleTypeInfo >> import org.apache.flink.table.functions.{AggregateFunction, >> FunctionContext} >> import java.lang.{Iterable => JIterable} >> >> >> class IntDiffSumAccumulator extends JTuple2[Int, Boolean] >> >> class IntDiffSumFunction extends AggregateFunction[Int, >> IntDiffSumAccumulator] { >> >> override def open(context: FunctionContext): Unit = { >> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State >> //getRuntimeContext.getState(desc) >> val a = this.hashCode() >> print(s"hashCode:$a") >> super.open(context) >> } >> >> override def createAccumulator(): IntDiffSumAccumulator = { >> val acc = new IntDiffSumAccumulator() >> acc.f0 = 0 >> acc.f1 = false >> acc >> } >> >> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { >> accumulator.f0 += value >> accumulator.f1 = true >> } >> >> override def getValue(accumulator: IntDiffSumAccumulator): Int = { >> if (accumulator.f1) { >> >> accumulator.f0 >> } else { >> Int.MinValue >> } >> } >> >> def merge(acc: IntDiffSumAccumulator, its: >> JIterable[IntDiffSumAccumulator]) = { >> val iter = its.iterator() >> while (true) { >> val a = iter.next() >> if (a.f1) { >> acc.f0 += a.f0 >> acc.f1 = true >> } >> } >> } >> >> def resetAccumulator(acc: IntDiffSumAccumulator) = { >> acc.f0 = 0 >> acc.f1 = false >> } >> >> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = >> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, >> BasicTypeInfo.BOOLEAN_TYPE_INFO) >> } >> >> >> From stevenchen >> webchat 38798579 >> >> >> |
1)在Table API & SQL中,RuntimeContext是不暴露给用户用的,所以是private
2)窗口之间聚合值的差值,可以看看cep能否满足需求,可以参考文档: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/match_recognize.html <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/match_recognize.html> > 在 2019年11月13日,下午3:35,Chennet Steven <[hidden email]> 写道: > > 场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么? > 请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥? > > From stevenchen > webchat 38798579 > > 发件人: Dian Fu<mailto:[hidden email]> > 发送时间: Thursday, November 7, 2019 19:41 > 收件人: [hidden email]<mailto:[hidden email]> > 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 > > 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java> >> 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道: >> >> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用, >> 能否给个example或者是test代码的链接啊? >> >> From stevenchen >> webchat 38798579 >> >> ________________________________ >> 发件人: wenlong.lwl <[hidden email]> >> 发送时间: Thursday, November 7, 2019 2:13:43 PM >> 收件人: [hidden email] <[hidden email]> >> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 >> >> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。 >> >> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote: >> >>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext >>> 如何在聚合函数中使用State? >>> >>> >>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, >>> TypeInformation} >>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} >>> import org.apache.flink.api.java.typeutils.TupleTypeInfo >>> import org.apache.flink.table.functions.{AggregateFunction, >>> FunctionContext} >>> import java.lang.{Iterable => JIterable} >>> >>> >>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean] >>> >>> class IntDiffSumFunction extends AggregateFunction[Int, >>> IntDiffSumAccumulator] { >>> >>> override def open(context: FunctionContext): Unit = { >>> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State >>> //getRuntimeContext.getState(desc) >>> val a = this.hashCode() >>> print(s"hashCode:$a") >>> super.open(context) >>> } >>> >>> override def createAccumulator(): IntDiffSumAccumulator = { >>> val acc = new IntDiffSumAccumulator() >>> acc.f0 = 0 >>> acc.f1 = false >>> acc >>> } >>> >>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { >>> accumulator.f0 += value >>> accumulator.f1 = true >>> } >>> >>> override def getValue(accumulator: IntDiffSumAccumulator): Int = { >>> if (accumulator.f1) { >>> >>> accumulator.f0 >>> } else { >>> Int.MinValue >>> } >>> } >>> >>> def merge(acc: IntDiffSumAccumulator, its: >>> JIterable[IntDiffSumAccumulator]) = { >>> val iter = its.iterator() >>> while (true) { >>> val a = iter.next() >>> if (a.f1) { >>> acc.f0 += a.f0 >>> acc.f1 = true >>> } >>> } >>> } >>> >>> def resetAccumulator(acc: IntDiffSumAccumulator) = { >>> acc.f0 = 0 >>> acc.f1 = false >>> } >>> >>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = >>> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, >>> BasicTypeInfo.BOOLEAN_TYPE_INFO) >>> } >>> >>> >>> From stevenchen >>> webchat 38798579 >>> >>> >>> > |
In reply to this post by Chennet Steven
这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口 2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature 3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。 假设输入消息有三个字段: Ts: 时间戳 Deviceid:设备编号 Temp: 设备温度 完整的SQL如下: INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM ( SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid ) ) 我用如下测试数据: "20000,dev1,1.2", "50000,dev1,1.3", "60000,dev1,1.4", "100000,dev1,1.5", "110000,dev1,1.6", "120000,dev1,1.7" 运行得到如下结果: {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} {"deviceid":"dev1","diff_temperature":0.3,"ts":119999} {"deviceid":"dev1","diff_temperature":0.1,"ts":179999} 如果你向完整的验证我的方法,你可以: 1,登陆 http://creek.baidubce.com/ 2,在作业订阅输入框,输入邮件末尾的作业定义(json) 3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定 耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。 作业定义(json): { "注释":{ "说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。", "输入示例": "1000,dev1,2.3", "输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} }, "sources": [{ "schema": { "format": "CSV", "fields": [{ "name": "ts", "type": "SQL_TIMESTAMP" }, { "name": "deviceid", "type": "STRING" }, { "name": "temp", "type": "DOUBLE" }] }, "watermark": 0, "name": "mysrc", "eventTime": "ts", "type": "COLLECTION", "attr": { "input": [ "10000,dev1,1.1", "20000,dev1,1.2", "50000,dev1,1.3", "60000,dev1,1.4", "100000,dev1,1.5", "110000,dev1,1.6", "120000,dev1,1.7" ] } }], "sink": { "schema": { "format": "JSON" }, "name": "mysink", "type": "STDOUT" }, "name": "demojob", "timeType": "EVENTTIME", "sql": "INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) " } -----邮件原件----- 发件人: Chennet Steven <[hidden email]> 发送时间: Wednesday, November 13, 2019 3:36 PM 收件人: [hidden email] 主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么? 请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥? From stevenchen webchat 38798579 发件人: Dian Fu<mailto:[hidden email]> 发送时间: Thursday, November 7, 2019 19:41 收件人: [hidden email]<mailto:[hidden email]> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java> > 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道: > > 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如 > 何在自定义函数中使用, > 能否给个example或者是test代码的链接啊? > > From stevenchen > webchat 38798579 > > ________________________________ > 发件人: wenlong.lwl <[hidden email]> > 发送时间: Thursday, November 7, 2019 2:13:43 PM > 收件人: [hidden email] <[hidden email]> > 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 > > 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。 > > On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote: > >> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex >> t >> 如何在聚合函数中使用State? >> >> >> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, >> TypeInformation} >> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import >> org.apache.flink.api.java.typeutils.TupleTypeInfo >> import org.apache.flink.table.functions.{AggregateFunction, >> FunctionContext} >> import java.lang.{Iterable => JIterable} >> >> >> class IntDiffSumAccumulator extends JTuple2[Int, Boolean] >> >> class IntDiffSumFunction extends AggregateFunction[Int, >> IntDiffSumAccumulator] { >> >> override def open(context: FunctionContext): Unit = { >> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State >> //getRuntimeContext.getState(desc) >> val a = this.hashCode() >> print(s"hashCode:$a") >> super.open(context) >> } >> >> override def createAccumulator(): IntDiffSumAccumulator = { >> val acc = new IntDiffSumAccumulator() >> acc.f0 = 0 >> acc.f1 = false >> acc >> } >> >> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { >> accumulator.f0 += value >> accumulator.f1 = true >> } >> >> override def getValue(accumulator: IntDiffSumAccumulator): Int = { >> if (accumulator.f1) { >> >> accumulator.f0 >> } else { >> Int.MinValue >> } >> } >> >> def merge(acc: IntDiffSumAccumulator, its: >> JIterable[IntDiffSumAccumulator]) = { >> val iter = its.iterator() >> while (true) { >> val a = iter.next() >> if (a.f1) { >> acc.f0 += a.f0 >> acc.f1 = true >> } >> } >> } >> >> def resetAccumulator(acc: IntDiffSumAccumulator) = { >> acc.f0 = 0 >> acc.f1 = false >> } >> >> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = >> new >> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, >> BasicTypeInfo.BOOLEAN_TYPE_INFO) >> } >> >> >> From stevenchen >> webchat 38798579 >> >> >> |
Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null?
运行得到如下结果: {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} ---- 这分钟的1.3 是否能有方法设置为null? {"deviceid":"dev1","diff_temperature":0.3,"ts":119999} {"deviceid":"dev1","diff_temperature":0.1,"ts":179999} From stevenchen webchat 38798579 ________________________________ 发件人: Yuan,Youjun <[hidden email]> 发送时间: Wednesday, November 13, 2019 11:34:53 PM 收件人: [hidden email] <[hidden email]> 主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 这个场景应可以通过标准的SQL完成计算。大致思路如下: 1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口 2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature 3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。 假设输入消息有三个字段: Ts: 时间戳 Deviceid:设备编号 Temp: 设备温度 完整的SQL如下: INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM ( SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid ) ) 我用如下测试数据: "20000,dev1,1.2", "50000,dev1,1.3", "60000,dev1,1.4", "100000,dev1,1.5", "110000,dev1,1.6", "120000,dev1,1.7" 运行得到如下结果: {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} {"deviceid":"dev1","diff_temperature":0.3,"ts":119999} {"deviceid":"dev1","diff_temperature":0.1,"ts":179999} 如果你向完整的验证我的方法,你可以: 1,登陆 http://creek.baidubce.com/ 2,在作业订阅输入框,输入邮件末尾的作业定义(json) 3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定 耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。 作业定义(json): { "注释":{ "说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。", "输入示例": "1000,dev1,2.3", "输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} }, "sources": [{ "schema": { "format": "CSV", "fields": [{ "name": "ts", "type": "SQL_TIMESTAMP" }, { "name": "deviceid", "type": "STRING" }, { "name": "temp", "type": "DOUBLE" }] }, "watermark": 0, "name": "mysrc", "eventTime": "ts", "type": "COLLECTION", "attr": { "input": [ "10000,dev1,1.1", "20000,dev1,1.2", "50000,dev1,1.3", "60000,dev1,1.4", "100000,dev1,1.5", "110000,dev1,1.6", "120000,dev1,1.7" ] } }], "sink": { "schema": { "format": "JSON" }, "name": "mysink", "type": "STDOUT" }, "name": "demojob", "timeType": "EVENTTIME", "sql": "INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) " } -----邮件原件----- 发件人: Chennet Steven <[hidden email]> 发送时间: Wednesday, November 13, 2019 3:36 PM 收件人: [hidden email] 主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么? 请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥? From stevenchen webchat 38798579 发件人: Dian Fu<mailto:[hidden email]> 发送时间: Thursday, November 7, 2019 19:41 收件人: [hidden email]<mailto:[hidden email]> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java> > 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]> 写道: > > 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如 > 何在自定义函数中使用, > 能否给个example或者是test代码的链接啊? > > From stevenchen > webchat 38798579 > > ________________________________ > 发件人: wenlong.lwl <[hidden email]> > 发送时间: Thursday, November 7, 2019 2:13:43 PM > 收件人: [hidden email] <[hidden email]> > 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 > > 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。 > > On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]> wrote: > >> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex >> t >> 如何在聚合函数中使用State? >> >> >> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, >> TypeInformation} >> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import >> org.apache.flink.api.java.typeutils.TupleTypeInfo >> import org.apache.flink.table.functions.{AggregateFunction, >> FunctionContext} >> import java.lang.{Iterable => JIterable} >> >> >> class IntDiffSumAccumulator extends JTuple2[Int, Boolean] >> >> class IntDiffSumFunction extends AggregateFunction[Int, >> IntDiffSumAccumulator] { >> >> override def open(context: FunctionContext): Unit = { >> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State >> //getRuntimeContext.getState(desc) >> val a = this.hashCode() >> print(s"hashCode:$a") >> super.open(context) >> } >> >> override def createAccumulator(): IntDiffSumAccumulator = { >> val acc = new IntDiffSumAccumulator() >> acc.f0 = 0 >> acc.f1 = false >> acc >> } >> >> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { >> accumulator.f0 += value >> accumulator.f1 = true >> } >> >> override def getValue(accumulator: IntDiffSumAccumulator): Int = { >> if (accumulator.f1) { >> >> accumulator.f0 >> } else { >> Int.MinValue >> } >> } >> >> def merge(acc: IntDiffSumAccumulator, its: >> JIterable[IntDiffSumAccumulator]) = { >> val iter = its.iterator() >> while (true) { >> val a = iter.next() >> if (a.f1) { >> acc.f0 += a.f0 >> acc.f1 = true >> } >> } >> } >> >> def resetAccumulator(acc: IntDiffSumAccumulator) = { >> acc.f0 = 0 >> acc.f1 = false >> } >> >> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = >> new >> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, >> BasicTypeInfo.BOOLEAN_TYPE_INFO) >> } >> >> >> From stevenchen >> webchat 38798579 >> >> >> |
SQL没有表达这种“最早一分钟”的逻辑。
如果在你的消息的开头,插入一个temperature=0的消息,那么你得到的第一个输出diff_temperature=0,不知道这种方式是否可以接受。 发件人: Chennet Steven <[hidden email]> 发送时间: Thursday, November 14, 2019 5:32 PM 收件人: [hidden email]; Yuan,Youjun <[hidden email]> 主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null? 运行得到如下结果: {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} ---- 这分钟的1.3 是否能有方法设置为null? {"deviceid":"dev1","diff_temperature":0.3,"ts":119999} {"deviceid":"dev1","diff_temperature":0.1,"ts":179999} From stevenchen webchat 38798579 ________________________________ 发件人: Yuan,Youjun <[hidden email]<mailto:[hidden email]>> 发送时间: Wednesday, November 13, 2019 11:34:53 PM 收件人: [hidden email]<mailto:[hidden email]> <[hidden email]<mailto:[hidden email]>> 主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 这个场景应可以通过标准的SQL完成计算。大致思路如下: 1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口 2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature 3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。 假设输入消息有三个字段: Ts: 时间戳 Deviceid:设备编号 Temp: 设备温度 完整的SQL如下: INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM ( SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid ) ) 我用如下测试数据: "20000,dev1,1.2", "50000,dev1,1.3", "60000,dev1,1.4", "100000,dev1,1.5", "110000,dev1,1.6", "120000,dev1,1.7" 运行得到如下结果: {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} {"deviceid":"dev1","diff_temperature":0.3,"ts":119999} {"deviceid":"dev1","diff_temperature":0.1,"ts":179999} 如果你向完整的验证我的方法,你可以: 1,登陆 http://creek.baidubce.com/ 2,在作业订阅输入框,输入邮件末尾的作业定义(json) 3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定 耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。 作业定义(json): { "注释":{ "说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。", "输入示例": "1000,dev1,2.3", "输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} }, "sources": [{ "schema": { "format": "CSV", "fields": [{ "name": "ts", "type": "SQL_TIMESTAMP" }, { "name": "deviceid", "type": "STRING" }, { "name": "temp", "type": "DOUBLE" }] }, "watermark": 0, "name": "mysrc", "eventTime": "ts", "type": "COLLECTION", "attr": { "input": [ "10000,dev1,1.1", "20000,dev1,1.2", "50000,dev1,1.3", "60000,dev1,1.4", "100000,dev1,1.5", "110000,dev1,1.6", "120000,dev1,1.7" ] } }], "sink": { "schema": { "format": "JSON" }, "name": "mysink", "type": "STDOUT" }, "name": "demojob", "timeType": "EVENTTIME", "sql": "INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) " } -----邮件原件----- 发件人: Chennet Steven <[hidden email]<mailto:[hidden email]>> 发送时间: Wednesday, November 13, 2019 3:36 PM 收件人: [hidden email]<mailto:[hidden email]> 主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么? 请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥? From stevenchen webchat 38798579 发件人: Dian Fu<mailto:[hidden email]> 发送时间: Thursday, November 7, 2019 19:41 收件人: [hidden email]<mailto:[hidden email]<mailto:[hidden email]%3cmailto:[hidden email]>> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java> > 在 2019年11月7日,下午7:06,Chennet Steven <[hidden email]<mailto:[hidden email]>> 写道: > > 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如 > 何在自定义函数中使用, > 能否给个example或者是test代码的链接啊? > > From stevenchen > webchat 38798579 > > ________________________________ > 发件人: wenlong.lwl <[hidden email]<mailto:[hidden email]>> > 发送时间: Thursday, November 7, 2019 2:13:43 PM > 收件人: [hidden email]<mailto:[hidden email]> <[hidden email]<mailto:[hidden email]>> > 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 > > 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。 > > On Thu, 7 Nov 2019 at 09:22, Chennet Steven <[hidden email]<mailto:[hidden email]>> wrote: > >> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex >> t >> 如何在聚合函数中使用State? >> >> >> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, >> TypeInformation} >> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import >> org.apache.flink.api.java.typeutils.TupleTypeInfo >> import org.apache.flink.table.functions.{AggregateFunction, >> FunctionContext} >> import java.lang.{Iterable => JIterable} >> >> >> class IntDiffSumAccumulator extends JTuple2[Int, Boolean] >> >> class IntDiffSumFunction extends AggregateFunction[Int, >> IntDiffSumAccumulator] { >> >> override def open(context: FunctionContext): Unit = { >> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State >> //getRuntimeContext.getState(desc) >> val a = this.hashCode() >> print(s"hashCode:$a") >> super.open(context) >> } >> >> override def createAccumulator(): IntDiffSumAccumulator = { >> val acc = new IntDiffSumAccumulator() >> acc.f0 = 0 >> acc.f1 = false >> acc >> } >> >> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { >> accumulator.f0 += value >> accumulator.f1 = true >> } >> >> override def getValue(accumulator: IntDiffSumAccumulator): Int = { >> if (accumulator.f1) { >> >> accumulator.f0 >> } else { >> Int.MinValue >> } >> } >> >> def merge(acc: IntDiffSumAccumulator, its: >> JIterable[IntDiffSumAccumulator]) = { >> val iter = its.iterator() >> while (true) { >> val a = iter.next() >> if (a.f1) { >> acc.f0 += a.f0 >> acc.f1 = true >> } >> } >> } >> >> def resetAccumulator(acc: IntDiffSumAccumulator) = { >> acc.f0 = 0 >> acc.f1 = false >> } >> >> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = >> new >> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, >> BasicTypeInfo.BOOLEAN_TYPE_INFO) >> } >> >> >> From stevenchen >> webchat 38798579 >> >> >> |
刚从1.7升级到1.9,感觉kafka的读取方式有了变化,没找到example关于1.9读取kafka的example,谁能给个demo的地址啊?
下面这个代码在1.9下没有跑通过,提示 Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'UptTimeMs' could not be resolved by the field mapping. at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245) at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204) at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70) at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516) at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70) at com.teld.demo.Kafka08App.FunA(Kafka08App.java:69) at com.teld.demo.Kafka08App.main(Kafka08App.java:23) 代码如下 private static void FunA() throws Exception { Configuration localConfig = new Configuration(); localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment ste = StreamTableEnvironment.create(env); Kafka kafka08 = new Kafka() .version("0.8") .topic("BDP-ChargingMinuteMetric") .startFromEarliest() .property("zookeeper.connect", "hdpjntest.chinacloudapp.cn:2182/kafka08") .property("bootstrap.servers", "telddruidteal.chinacloudapp.cn:9095") .property("group.id", "abc"); Schema schema = new Schema() .field("UptTimeMs", Types.SQL_TIMESTAMP) .rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000)) .field("BillId", Types.STRING) .field("SOC", Types.DOUBLE) .field("HighestVoltage", Types.DOUBLE); TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE}; String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"}; TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames); FormatDescriptor formatDescriptor = new Json().failOnMissingField(false).schema(typeInformation).deriveSchema(); ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable"); Table table = ste.sqlQuery("select * from SourceTable"); DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class); rowDataStream.print(); ste.execute("ABC"); } From stevenchen webchat 38798579 |
Kafka的一个value如下
{ "BillID": "230c95c6-346c-4070-9b49-b3bbbf6691db", "BillCode": "201912230300118165", "UptTimeMs": 1577091480000, "SOC": 0.86, "HighestVoltage": 4.019999980926514 } 其中 "UptTimeMs": 1577091480000 是到1970的毫秒值 代码修改如下 private static void FunA() throws Exception { Configuration localConfig = new Configuration(); localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment ste = StreamTableEnvironment.create(env); Kafka kafka08 = new Kafka() .version("0.8") .topic("BDP-ChargingMinuteMetric") .startFromEarliest() .property("zookeeper.connect", "def:2182/ ") .property("bootstrap.servers", "abc:9095") .property("group.id", "abc"); Schema schema = new Schema() .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000)) .field("BillID", Types.STRING) .field("SOC", Types.DOUBLE) .field("HighestVoltage", Types.DOUBLE); TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE}; String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"}; TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames); FormatDescriptor formatDescriptor = new Json().failOnMissingField(true).schema(typeInformation).deriveSchema(); ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable"); Table table = ste.sqlQuery("select * from SourceTable"); DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class); rowDataStream.print(); ste.execute("ABC"); } 提示错误 Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410) at com.teld.demo.Kafka08App.FunA(Kafka08App.java:75) at com.teld.demo.Kafka08App.main(Kafka08App.java:23) Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:374) Caused by: java.time.format.DateTimeParseException: Text '1577085840000' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127) 查看源代码发现: https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java 发现rowtime的必须是Sql_TimeStamp类型,而Sql_TimeStamp使用RFC3339_TIMESTAMP_FORMAT来解析,这需要一个String类型的字符串啊,如果是到1970的毫秒值,这个地方该如何做啊? From stevenchen webchat 38798579 发件人: Chennet Steven<mailto:[hidden email]> 发送时间: Tuesday, December 24, 2019 15:53 收件人: [hidden email]<mailto:[hidden email]> 主题: Flink1.9.1,TableApi如何读取Kafka08Json的数据 刚从1.7升级到1.9,感觉kafka的读取方式有了变化,没找到example关于1.9读取kafka的example,谁能给个demo的地址啊? 下面这个代码在1.9下没有跑通过,提示 Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'UptTimeMs' could not be resolved by the field mapping. at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245) at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204) at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70) at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516) at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70) at com.teld.demo.Kafka08App.FunA(Kafka08App.java:69) at com.teld.demo.Kafka08App.main(Kafka08App.java:23) 代码如下 private static void FunA() throws Exception { Configuration localConfig = new Configuration(); localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment ste = StreamTableEnvironment.create(env); Kafka kafka08 = new Kafka() .version("0.8") .topic("BDP-ChargingMinuteMetric") .startFromEarliest() .property("zookeeper.connect", "abc:2182/kafka08") .property("bootstrap.servers", "def:9095") .property("group.id", "abc"); Schema schema = new Schema() .field("UptTimeMs", Types.SQL_TIMESTAMP) .rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000)) .field("BillId", Types.STRING) .field("SOC", Types.DOUBLE) .field("HighestVoltage", Types.DOUBLE); TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE}; String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"}; TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames); FormatDescriptor formatDescriptor = new Json().failOnMissingField(false).schema(typeInformation).deriveSchema(); ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable"); Table table = ste.sqlQuery("select * from SourceTable"); DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class); rowDataStream.print(); ste.execute("ABC"); } From stevenchen webchat 38798579 |
Free forum by Nabble | Edit this page |