rowtime/proctime

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

rowtime/proctime

somnussuy

在执行 SQL 的时候遇见如下情况:
 streamTableEnvironment.registerDataStream()注册的字段有一个是 over_time.rowtime
使用 sql:select tumble(over_time,interval '1' second),over_time from kafka_source group by over_time,tumble(over_time,interval '1' second)  会报错。ClassCastException 
如果 将 group by 后面的子句交换位置, group by tumble(over_time,interval '1' second) 能正常运行
如果 将 over_time 设置为 proctime,group by 之后无论子句的位置任何,均可以正常运行。
请问:
1. 两种 时间属性 对于该条 SQL 的影响是?
附录:
报错信息:
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
        at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
        at org.apache.flink.table.runtime.RowtimeProcessFunction.processElement(RowtimeProcessFunction.scala:43)
        at org.apache.flink.table.runtime.RowtimeProcessFunction.processElement(RowtimeProcessFunction.scala:32)
        at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
        at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
        at DataStreamCalcRule$50.processElement(Unknown Source)
        at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
        at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
        at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
        at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
        at DataStreamSourceConversion$45.processElement(Unknown Source)
        at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
        at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)