Login  Register

答复: 为何会报"Window can only be defined over a time attribute column."??

Posted by Chennet Steven on Jun 21, 2019; 6:45am
URL: http://apache-flink.370.s1.nabble.com/Window-can-only-be-defined-over-a-time-attribute-column-tp13p14.html

貌似问题变成这样子

程序流程如下:WM=watermark
source(没有WM)-->转换A-->TableA(没有WM)--->转换B--->TableB(没有WM)---->转换C(sql TUMBLE)-->TableC-->Sink
为了让转换C的Sql能够成功执行,如何在TableB上Assign一个Watermark??


From stevenchen
         webchat 38798579

________________________________
发件人: Chennet Steven
发送时间: Friday, June 21, 2019 11:34:25 AM
收件人: [hidden email]
主题: 为何会报"Window can only be defined over a time attribute column."??


Flink 1.4.2

执行Sql SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag
会报异常,但是感觉UptMs列类型是TimeStamp啊?
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
         at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:69)
         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:82)
         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:87)
         at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:506)
         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:385)
         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
         at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:197)
         at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:257)
         at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:665)
         at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
         at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
         at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
         at com.teld.bdp.tablewindow.TubleApp$.main(TubleApp.scala:56)
         at com.teld.bdp.tablewindow.TubleApp.main(TubleApp.scala)
Disconnected from the target VM, address: '127.0.0.1:60155', transport: 'socket'


代码如下


package com.teld.bdp.tablewindow

import java.io.Serializable

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.Types
import org.apache.flink.api.java.tuple.Tuple3
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

import org.apache.flink.table.api.TableEnvironment

// Flink 1.4.2
object TubleApp {
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.createLocalEnvironment(1)
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tableEnv = TableEnvironment.getTableEnvironment(senv)
    tableEnv.registerFunction("TOTIMESTAMP", new ConvertLongToTimeStamp())

    // 2019-06-20 10:27:57 1560997677000
    // 2019-06-20 11:27:57 1561001277000
    // 2019-06-20 12:27:57 1561004877000
    val source = senv.fromElements(
      new Tuple3("a", 1, 1560997677000L),
      new Tuple3("b", 1, 1560997677000L),

      new Tuple3("a", 2, 1561001277000L),
      new Tuple3("b", 2, 1561001277000L),

      new Tuple3("a", 3, 1561004877000L),
      new Tuple3("b", 3, 1561004877000L)
    )

    import org.apache.flink.table.api.scala._
    tableEnv.registerDataStream("Temp", source, 'Tag, 'PowerX, 'UptTime)

    //-----------------------------源Table-----------------------
    val sourceTable = tableEnv.sqlQuery("select * from Temp")
    // root
    // |-- Tag: String
    // |-- PowerX: Integer
    // |-- UptTime: Long
    sourceTable.printSchema()
    tableEnv.registerTable("SourceTable", sourceTable)

    //-----------------------------转换成TimeStamp-----------------------
    val timeTable = tableEnv.sqlQuery("SELECT Tag, PowerX, UptTime, TOTIMESTAMP(UptTime) AS UptMs FROM SourceTable")
    // root
    // |-- Tag: String
    // |-- PowerX: Integer
    // |-- UptTime: Long
    // |-- UptMs: Timestamp
    timeTable.printSchema()
    tableEnv.registerTable("TimeTable", timeTable)
    tableEnv.toAppendStream[Row](timeTable).print()

    //------------------------------agg-------------------------------------
    val aggTable = tableEnv.sqlQuery("SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag")
    // root
    // |-- Tag: String
    // |-- EXPR$1: Integer
    aggTable.printSchema()
    // 为啥这下面这个聚合的table会抛异常啊?
    // Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
    tableEnv.toRetractStream[Row](aggTable).print()


    senv.execute("abc")
  }
}

class ConvertLongToTimeStamp extends ScalarFunction with Serializable {
  def eval(uptTime: Long): Long = uptTime

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.SQL_TIMESTAMP
}



From stevenchen
         webchat 38798579