Posted by
Chennet Steven on
Jun 21, 2019; 6:45am
URL: http://apache-flink.370.s1.nabble.com/Window-can-only-be-defined-over-a-time-attribute-column-tp13p14.html
貌似问题变成这样子
程序流程如下:WM=watermark
source(没有WM)-->转换A-->TableA(没有WM)--->转换B--->TableB(没有WM)---->转换C(sql TUMBLE)-->TableC-->Sink
为了让转换C的Sql能够成功执行,如何在TableB上Assign一个Watermark??
From stevenchen
webchat 38798579
________________________________
发件人: Chennet Steven
发送时间: Friday, June 21, 2019 11:34:25 AM
收件人:
[hidden email]
主题: 为何会报"Window can only be defined over a time attribute column."??
Flink 1.4.2
执行Sql SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag
会报异常,但是感觉UptMs列类型是TimeStamp啊?
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:69)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:82)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:87)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:506)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:385)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:197)
at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:257)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:665)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at com.teld.bdp.tablewindow.TubleApp$.main(TubleApp.scala:56)
at com.teld.bdp.tablewindow.TubleApp.main(TubleApp.scala)
Disconnected from the target VM, address: '127.0.0.1:60155', transport: 'socket'
代码如下
package com.teld.bdp.tablewindow
import java.io.Serializable
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.Types
import org.apache.flink.api.java.tuple.Tuple3
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
// Flink 1.4.2
object TubleApp {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.createLocalEnvironment(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv = TableEnvironment.getTableEnvironment(senv)
tableEnv.registerFunction("TOTIMESTAMP", new ConvertLongToTimeStamp())
// 2019-06-20 10:27:57 1560997677000
// 2019-06-20 11:27:57 1561001277000
// 2019-06-20 12:27:57 1561004877000
val source = senv.fromElements(
new Tuple3("a", 1, 1560997677000L),
new Tuple3("b", 1, 1560997677000L),
new Tuple3("a", 2, 1561001277000L),
new Tuple3("b", 2, 1561001277000L),
new Tuple3("a", 3, 1561004877000L),
new Tuple3("b", 3, 1561004877000L)
)
import org.apache.flink.table.api.scala._
tableEnv.registerDataStream("Temp", source, 'Tag, 'PowerX, 'UptTime)
//-----------------------------源Table-----------------------
val sourceTable = tableEnv.sqlQuery("select * from Temp")
// root
// |-- Tag: String
// |-- PowerX: Integer
// |-- UptTime: Long
sourceTable.printSchema()
tableEnv.registerTable("SourceTable", sourceTable)
//-----------------------------转换成TimeStamp-----------------------
val timeTable = tableEnv.sqlQuery("SELECT Tag, PowerX, UptTime, TOTIMESTAMP(UptTime) AS UptMs FROM SourceTable")
// root
// |-- Tag: String
// |-- PowerX: Integer
// |-- UptTime: Long
// |-- UptMs: Timestamp
timeTable.printSchema()
tableEnv.registerTable("TimeTable", timeTable)
tableEnv.toAppendStream[Row](timeTable).print()
//------------------------------agg-------------------------------------
val aggTable = tableEnv.sqlQuery("SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag")
// root
// |-- Tag: String
// |-- EXPR$1: Integer
aggTable.printSchema()
// 为啥这下面这个聚合的table会抛异常啊?
// Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
tableEnv.toRetractStream[Row](aggTable).print()
senv.execute("abc")
}
}
class ConvertLongToTimeStamp extends ScalarFunction with Serializable {
def eval(uptTime: Long): Long = uptTime
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.SQL_TIMESTAMP
}
From stevenchen
webchat 38798579