SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], fields=[time, sum_age]) +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) :- FlinkLogicalCalc(select=[uid, time]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) +- FlinkLogicalSnapshot(period=[$cor0.time]) +- FlinkLogicalCalc(select=[uid, age]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, age, created_time)]]], fields=[uid, sex, age, created_time]) Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) at org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) ... 20 more query: val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_behavior ( | uid VARCHAR, | phoneType VARCHAR, | clickCount INT, | `time` TIMESTAMP(3) |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user_behavior', | 'connector.startup-mode' = 'earliest-offset', | 'connector.properties.0.key' = 'zookeeper.connect', | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.1.key' = 'bootstrap.servers', | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) |""".stripMargin) streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_cnt ( | `time` VARCHAR, | sum_age INT |) WITH ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', | 'connector.table' = 'user_cnt', | 'connector.username' = 'root', | 'connector.password' = '123456', | 'connector.write.flush.max-rows' = '1' |) |""".stripMargin) val userTableSource = new MysqlAsyncLookupTableSource( Array("uid", "sex", "age", "created_time"), Array(), Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) streamTableEnv.registerTableSource("users", userTableSource) streamTableEnv.sqlUpdate( """ | |insert into user_cnt |SELECT | cast(b.`time` as string), u.age |FROM | user_behavior AS b | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u | ON b.uid = u.uid | |""".stripMargin) streamTableEnv.execute("Temporal table join") |
Hi,
Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 可以参考下[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: > SLF4J: Class path contains multiple SLF4J bindings. > > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > > ERROR StatusLogger No log4j2 configuration file found. Using default > configuration: logging only errors to the console. > > Exception in thread "main" org.apache.flink.table.api.TableException: > Cannot generate a valid execution plan for the given query: > > > > > FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], > fields=[time, sum_age]) > > +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) > > +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) > > :- FlinkLogicalCalc(select=[uid, time]) > > : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, > clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) > > +- FlinkLogicalSnapshot(period=[$cor0.time]) > > +- FlinkLogicalCalc(select=[uid, age]) > > +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, > age, created_time)]]], fields=[uid, sex, age, created_time]) > > > > > Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left > table's proctime field, doesn't support 'PROCTIME()' > > Please check the documentation for the set of currently supported SQL > features. > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > at > org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) > > at > org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) > > Caused by: org.apache.flink.table.api.TableException: Temporal table join > currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime > field, doesn't support 'PROCTIME()' > > at > org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) > > at > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) > > at > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) > > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > > ... 20 more > > > > > query: > > > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > val blinkEnvSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val streamTableEnv = > StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) > > streamTableEnv.sqlUpdate( > """ > | > |CREATE TABLE user_behavior ( > | uid VARCHAR, > | phoneType VARCHAR, > | clickCount INT, > | `time` TIMESTAMP(3) > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user_behavior', > | 'connector.startup-mode' = 'earliest-offset', > | 'connector.properties.0.key' = 'zookeeper.connect', > | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.1.key' = 'bootstrap.servers', > | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'update-mode' = 'append', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |) > |""".stripMargin) > streamTableEnv.sqlUpdate( > """ > | > |CREATE TABLE user_cnt ( > | `time` VARCHAR, > | sum_age INT > |) WITH ( > | 'connector.type' = 'jdbc', > | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', > | 'connector.table' = 'user_cnt', > | 'connector.username' = 'root', > | 'connector.password' = '123456', > | 'connector.write.flush.max-rows' = '1' > |) > |""".stripMargin) > val userTableSource = new MysqlAsyncLookupTableSource( > Array("uid", "sex", "age", "created_time"), > Array(), > Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) > streamTableEnv.registerTableSource("users", userTableSource) > streamTableEnv.sqlUpdate( > """ > | > |insert into user_cnt > |SELECT > | cast(b.`time` as string), u.age > |FROM > | user_behavior AS b > | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u > | ON b.uid = u.uid > | > |""".stripMargin) > streamTableEnv.execute("Temporal table join") |
flink 1.10.0:
在create table中,加PROCTIME() AS proctime字段报错 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: >Hi, > >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >可以参考下[1] > >[1] >https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html > >Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: > >> SLF4J: Class path contains multiple SLF4J bindings. >> >> SLF4J: Found binding in >> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> SLF4J: Found binding in >> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> explanation. >> >> SLF4J: Actual binding is of type >> [org.apache.logging.slf4j.Log4jLoggerFactory] >> >> ERROR StatusLogger No log4j2 configuration file found. Using default >> configuration: logging only errors to the console. >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> Cannot generate a valid execution plan for the given query: >> >> >> >> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >> fields=[time, sum_age]) >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >> >> :- FlinkLogicalCalc(select=[uid, time]) >> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >> >> +- FlinkLogicalSnapshot(period=[$cor0.time]) >> >> +- FlinkLogicalCalc(select=[uid, age]) >> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, >> age, created_time)]]], fields=[uid, sex, age, created_time]) >> >> >> >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left >> table's proctime field, doesn't support 'PROCTIME()' >> >> Please check the documentation for the set of currently supported SQL >> features. >> >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >> >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >> >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >> >> at >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> >> at >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> at >> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >> >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >> >> at >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >> >> at >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >> >> at >> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >> >> at >> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >> >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >> >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> >> at >> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >> >> at >> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table join >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >> field, doesn't support 'PROCTIME()' >> >> at >> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >> >> at >> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >> >> at >> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >> >> at >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >> >> at >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> at >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> at >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> at >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> at >> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >> >> at >> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >> >> at >> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> at >> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >> >> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >> >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >> >> ... 20 more >> >> >> >> >> query: >> >> >> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >> val blinkEnvSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> val streamTableEnv = >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >> >> streamTableEnv.sqlUpdate( >> """ >> | >> |CREATE TABLE user_behavior ( >> | uid VARCHAR, >> | phoneType VARCHAR, >> | clickCount INT, >> | `time` TIMESTAMP(3) >> |) WITH ( >> | 'connector.type' = 'kafka', >> | 'connector.version' = 'universal', >> | 'connector.topic' = 'user_behavior', >> | 'connector.startup-mode' = 'earliest-offset', >> | 'connector.properties.0.key' = 'zookeeper.connect', >> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >> | 'connector.properties.1.key' = 'bootstrap.servers', >> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >> | 'update-mode' = 'append', >> | 'format.type' = 'json', >> | 'format.derive-schema' = 'true' >> |) >> |""".stripMargin) >> streamTableEnv.sqlUpdate( >> """ >> | >> |CREATE TABLE user_cnt ( >> | `time` VARCHAR, >> | sum_age INT >> |) WITH ( >> | 'connector.type' = 'jdbc', >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >> | 'connector.table' = 'user_cnt', >> | 'connector.username' = 'root', >> | 'connector.password' = '123456', >> | 'connector.write.flush.max-rows' = '1' >> |) >> |""".stripMargin) >> val userTableSource = new MysqlAsyncLookupTableSource( >> Array("uid", "sex", "age", "created_time"), >> Array(), >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >> streamTableEnv.registerTableSource("users", userTableSource) >> streamTableEnv.sqlUpdate( >> """ >> | >> |insert into user_cnt >> |SELECT >> | cast(b.`time` as string), u.age >> |FROM >> | user_behavior AS b >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >> | ON b.uid = u.uid >> | >> |""".stripMargin) >> streamTableEnv.execute("Temporal table join") |
需要使用Proctime才可以关联,参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
> 在 2020年6月12日,下午2:24,Zhou Zach <[hidden email]> 写道: > > flink 1.10.0: > 在create table中,加PROCTIME() AS proctime字段报错 > > > > > > > > > > > > > > > > > >> 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: >> Hi, >> >> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >> 可以参考下[1] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >> >> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: >> >>> SLF4J: Class path contains multiple SLF4J bindings. >>> >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> >>> SLF4J: Actual binding is of type >>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>> >>> ERROR StatusLogger No log4j2 configuration file found. Using default >>> configuration: logging only errors to the console. >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: >>> Cannot generate a valid execution plan for the given query: >>> >>> >>> >>> >>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>> fields=[time, sum_age]) >>> >>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >>> >>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >>> >>> :- FlinkLogicalCalc(select=[uid, time]) >>> >>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, >>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >>> >>> +- FlinkLogicalSnapshot(period=[$cor0.time]) >>> >>> +- FlinkLogicalCalc(select=[uid, age]) >>> >>> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, >>> age, created_time)]]], fields=[uid, sex, age, created_time]) >>> >>> >>> >>> >>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left >>> table's proctime field, doesn't support 'PROCTIME()' >>> >>> Please check the documentation for the set of currently supported SQL >>> features. >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>> >>> at >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>> >>> at >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>> >>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>> >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>> >>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>> >>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>> >>> at >>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>> >>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>> >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>> >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>> >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>> >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>> >>> Caused by: org.apache.flink.table.api.TableException: Temporal table join >>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >>> field, doesn't support 'PROCTIME()' >>> >>> at >>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >>> >>> at >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >>> >>> at >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>> >>> at >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>> >>> at >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >>> >>> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >>> >>> ... 20 more >>> >>> >>> >>> >>> query: >>> >>> >>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >>> val blinkEnvSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> val streamTableEnv = >>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >>> >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |CREATE TABLE user_behavior ( >>> | uid VARCHAR, >>> | phoneType VARCHAR, >>> | clickCount INT, >>> | `time` TIMESTAMP(3) >>> |) WITH ( >>> | 'connector.type' = 'kafka', >>> | 'connector.version' = 'universal', >>> | 'connector.topic' = 'user_behavior', >>> | 'connector.startup-mode' = 'earliest-offset', >>> | 'connector.properties.0.key' = 'zookeeper.connect', >>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>> | 'connector.properties.1.key' = 'bootstrap.servers', >>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>> | 'update-mode' = 'append', >>> | 'format.type' = 'json', >>> | 'format.derive-schema' = 'true' >>> |) >>> |""".stripMargin) >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |CREATE TABLE user_cnt ( >>> | `time` VARCHAR, >>> | sum_age INT >>> |) WITH ( >>> | 'connector.type' = 'jdbc', >>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >>> | 'connector.table' = 'user_cnt', >>> | 'connector.username' = 'root', >>> | 'connector.password' = '123456', >>> | 'connector.write.flush.max-rows' = '1' >>> |) >>> |""".stripMargin) >>> val userTableSource = new MysqlAsyncLookupTableSource( >>> Array("uid", "sex", "age", "created_time"), >>> Array(), >>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >>> streamTableEnv.registerTableSource("users", userTableSource) >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |insert into user_cnt >>> |SELECT >>> | cast(b.`time` as string), u.age >>> |FROM >>> | user_behavior AS b >>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >>> | ON b.uid = u.uid >>> | >>> |""".stripMargin) >>> streamTableEnv.execute("Temporal table join") |
In reply to this post by Zhou Zach
你写反了,是proctime AS PROCTIME()。
计算列跟普通query里面的AS是反着的。 Zhou Zach <[hidden email]> 于2020年6月12日周五 下午2:24写道: > flink 1.10.0: > 在create table中,加PROCTIME() AS proctime字段报错 > > > > > > > > > > > > > > > > > > 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: > >Hi, > > > >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 > >可以参考下[1] > > > >[1] > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html > > > >Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: > > > >> SLF4J: Class path contains multiple SLF4J bindings. > >> > >> SLF4J: Found binding in > >> > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> > >> SLF4J: Found binding in > >> > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> > >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > >> explanation. > >> > >> SLF4J: Actual binding is of type > >> [org.apache.logging.slf4j.Log4jLoggerFactory] > >> > >> ERROR StatusLogger No log4j2 configuration file found. Using default > >> configuration: logging only errors to the console. > >> > >> Exception in thread "main" org.apache.flink.table.api.TableException: > >> Cannot generate a valid execution plan for the given query: > >> > >> > >> > >> > >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], > >> fields=[time, sum_age]) > >> > >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) > >> > >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) > >> > >> :- FlinkLogicalCalc(select=[uid, time]) > >> > >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > >> default_database, user_behavior, source: [KafkaTableSource(uid, > phoneType, > >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) > >> > >> +- FlinkLogicalSnapshot(period=[$cor0.time]) > >> > >> +- FlinkLogicalCalc(select=[uid, age]) > >> > >> +- FlinkLogicalTableSourceScan(table=[[default_catalog, > >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, > >> age, created_time)]]], fields=[uid, sex, age, created_time]) > >> > >> > >> > >> > >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left > >> table's proctime field, doesn't support 'PROCTIME()' > >> > >> Please check the documentation for the set of currently supported SQL > >> features. > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > >> > >> at > >> > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > >> > >> at > >> > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > >> > >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> > >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> > >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> > >> at > >> > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > >> > >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > >> > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > >> > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > >> > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > >> > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > >> > >> at > >> > org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) > >> > >> at > >> > org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) > >> > >> Caused by: org.apache.flink.table.api.TableException: Temporal table > join > >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime > >> field, doesn't support 'PROCTIME()' > >> > >> at > >> > org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) > >> > >> at > >> > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) > >> > >> at > >> > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > >> > >> at > >> > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > >> > >> at > >> > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > >> > >> at > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) > >> > >> at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > >> > >> ... 20 more > >> > >> > >> > >> > >> query: > >> > >> > >> val streamExecutionEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> val blinkEnvSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> val streamTableEnv = > >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) > >> > >> streamTableEnv.sqlUpdate( > >> """ > >> | > >> |CREATE TABLE user_behavior ( > >> | uid VARCHAR, > >> | phoneType VARCHAR, > >> | clickCount INT, > >> | `time` TIMESTAMP(3) > >> |) WITH ( > >> | 'connector.type' = 'kafka', > >> | 'connector.version' = 'universal', > >> | 'connector.topic' = 'user_behavior', > >> | 'connector.startup-mode' = 'earliest-offset', > >> | 'connector.properties.0.key' = 'zookeeper.connect', > >> | 'connector.properties.0.value' = > 'cdh1:2181,cdh2:2181,cdh3:2181', > >> | 'connector.properties.1.key' = 'bootstrap.servers', > >> | 'connector.properties.1.value' = > 'cdh1:9092,cdh2:9092,cdh3:9092', > >> | 'update-mode' = 'append', > >> | 'format.type' = 'json', > >> | 'format.derive-schema' = 'true' > >> |) > >> |""".stripMargin) > >> streamTableEnv.sqlUpdate( > >> """ > >> | > >> |CREATE TABLE user_cnt ( > >> | `time` VARCHAR, > >> | sum_age INT > >> |) WITH ( > >> | 'connector.type' = 'jdbc', > >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', > >> | 'connector.table' = 'user_cnt', > >> | 'connector.username' = 'root', > >> | 'connector.password' = '123456', > >> | 'connector.write.flush.max-rows' = '1' > >> |) > >> |""".stripMargin) > >> val userTableSource = new MysqlAsyncLookupTableSource( > >> Array("uid", "sex", "age", "created_time"), > >> Array(), > >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) > >> streamTableEnv.registerTableSource("users", userTableSource) > >> streamTableEnv.sqlUpdate( > >> """ > >> | > >> |insert into user_cnt > >> |SELECT > >> | cast(b.`time` as string), u.age > >> |FROM > >> | user_behavior AS b > >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u > >> | ON b.uid = u.uid > >> | > >> |""".stripMargin) > >> streamTableEnv.execute("Temporal table join") > |
In reply to this post by Zhou Zach
参考一下sql相关time的文档,根据描述使用姿势有问题:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
At 2020-06-12 14:24:07, "Zhou Zach" <[hidden email]> wrote: >flink 1.10.0: >在create table中,加PROCTIME() AS proctime字段报错 > > > > > > > > > > > > > > > > > >在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: >>Hi, >> >>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >>可以参考下[1] >> >>[1] >>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >> >>Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: >> >>> SLF4J: Class path contains multiple SLF4J bindings. >>> >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> >>> SLF4J: Actual binding is of type >>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>> >>> ERROR StatusLogger No log4j2 configuration file found. Using default >>> configuration: logging only errors to the console. >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: >>> Cannot generate a valid execution plan for the given query: >>> >>> >>> >>> >>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>> fields=[time, sum_age]) >>> >>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >>> >>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >>> >>> :- FlinkLogicalCalc(select=[uid, time]) >>> >>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, >>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >>> >>> +- FlinkLogicalSnapshot(period=[$cor0.time]) >>> >>> +- FlinkLogicalCalc(select=[uid, age]) >>> >>> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, >>> age, created_time)]]], fields=[uid, sex, age, created_time]) >>> >>> >>> >>> >>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left >>> table's proctime field, doesn't support 'PROCTIME()' >>> >>> Please check the documentation for the set of currently supported SQL >>> features. >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>> >>> at >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>> >>> at >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>> >>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>> >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>> >>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>> >>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>> >>> at >>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>> >>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>> >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>> >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>> >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>> >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>> >>> Caused by: org.apache.flink.table.api.TableException: Temporal table join >>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >>> field, doesn't support 'PROCTIME()' >>> >>> at >>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >>> >>> at >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >>> >>> at >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>> >>> at >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>> >>> at >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>> >>> at >>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >>> >>> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >>> >>> ... 20 more >>> >>> >>> >>> >>> query: >>> >>> >>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >>> val blinkEnvSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> val streamTableEnv = >>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >>> >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |CREATE TABLE user_behavior ( >>> | uid VARCHAR, >>> | phoneType VARCHAR, >>> | clickCount INT, >>> | `time` TIMESTAMP(3) >>> |) WITH ( >>> | 'connector.type' = 'kafka', >>> | 'connector.version' = 'universal', >>> | 'connector.topic' = 'user_behavior', >>> | 'connector.startup-mode' = 'earliest-offset', >>> | 'connector.properties.0.key' = 'zookeeper.connect', >>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>> | 'connector.properties.1.key' = 'bootstrap.servers', >>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>> | 'update-mode' = 'append', >>> | 'format.type' = 'json', >>> | 'format.derive-schema' = 'true' >>> |) >>> |""".stripMargin) >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |CREATE TABLE user_cnt ( >>> | `time` VARCHAR, >>> | sum_age INT >>> |) WITH ( >>> | 'connector.type' = 'jdbc', >>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >>> | 'connector.table' = 'user_cnt', >>> | 'connector.username' = 'root', >>> | 'connector.password' = '123456', >>> | 'connector.write.flush.max-rows' = '1' >>> |) >>> |""".stripMargin) >>> val userTableSource = new MysqlAsyncLookupTableSource( >>> Array("uid", "sex", "age", "created_time"), >>> Array(), >>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >>> streamTableEnv.registerTableSource("users", userTableSource) >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |insert into user_cnt >>> |SELECT >>> | cast(b.`time` as string), u.age >>> |FROM >>> | user_behavior AS b >>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >>> | ON b.uid = u.uid >>> | >>> |""".stripMargin) >>> streamTableEnv.execute("Temporal table join") |
In reply to this post by Benchao Li-2
还是不行,
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "time FROM" at line 1, column 44. Was expecting one of: "CURSOR" ... "EXISTS" ... "NOT" ... "ROW" ... "(" ... "+" ... "-" ... <UNSIGNED_INTEGER_LITERAL> ... <DECIMAL_NUMERIC_LITERAL> ... <APPROX_NUMERIC_LITERAL> ... <BINARY_STRING_LITERAL> ... <PREFIXED_STRING_LITERAL> ... <QUOTED_STRING> ... <UNICODE_STRING_LITERAL> ... "TRUE" ... "FALSE" ... "UNKNOWN" ... "NULL" ... <LBRACE_D> ... <LBRACE_T> ... <LBRACE_TS> ... "DATE" ... "TIME" <QUOTED_STRING> ... "TIMESTAMP" ... "INTERVAL" ... "?" ... "CAST" ... "EXTRACT" ... "POSITION" ... "CONVERT" ... "TRANSLATE" ... "OVERLAY" ... "FLOOR" ... "CEIL" ... "CEILING" ... "SUBSTRING" ... "TRIM" ... "CLASSIFIER" ... "MATCH_NUMBER" ... "RUNNING" ... "PREV" ... "NEXT" ... "JSON_EXISTS" ... "JSON_VALUE" ... "JSON_QUERY" ... "JSON_OBJECT" ... "JSON_OBJECTAGG" ... "JSON_ARRAY" ... "JSON_ARRAYAGG" ... <LBRACE_FN> ... "MULTISET" ... "ARRAY" ... "MAP" ... "PERIOD" ... "SPECIFIC" ... <IDENTIFIER> ... <QUOTED_IDENTIFIER> ... <BACK_QUOTED_IDENTIFIER> ... <BRACKET_QUOTED_IDENTIFIER> ... <UNICODE_QUOTED_IDENTIFIER> ... "ABS" ... "AVG" ... "CARDINALITY" ... "CHAR_LENGTH" ... "CHARACTER_LENGTH" ... "COALESCE" ... "COLLECT" ... "COVAR_POP" ... "COVAR_SAMP" ... "CUME_DIST" ... "COUNT" ... "CURRENT_DATE" ... "CURRENT_TIME" ... "CURRENT_TIMESTAMP" ... "DENSE_RANK" ... "ELEMENT" ... "EXP" ... "FIRST_VALUE" ... "FUSION" ... "GROUPING" ... "HOUR" ... "LAG" ... "LEAD" ... "LEFT" ... "LAST_VALUE" ... "LN" ... "LOCALTIME" ... "LOCALTIMESTAMP" ... "LOWER" ... "MAX" ... "MIN" ... "MINUTE" ... "MOD" ... "MONTH" ... "NTH_VALUE" ... "NTILE" ... "NULLIF" ... "OCTET_LENGTH" ... "PERCENT_RANK" ... "POWER" ... "RANK" ... "REGR_COUNT" ... "REGR_SXX" ... "REGR_SYY" ... "RIGHT" ... "ROW_NUMBER" ... "SECOND" ... "SQRT" ... "STDDEV_POP" ... "STDDEV_SAMP" ... "SUM" ... "UPPER" ... "TRUNCATE" ... "USER" ... "VAR_POP" ... "VAR_SAMP" ... "YEAR" ... "CURRENT_CATALOG" ... "CURRENT_DEFAULT_TRANSFORM_GROUP" ... "CURRENT_PATH" ... "CURRENT_ROLE" ... "CURRENT_SCHEMA" ... "CURRENT_USER" ... "SESSION_USER" ... "SYSTEM_USER" ... "NEW" ... "CASE" ... "CURRENT" ... at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) at org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) at org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) query: streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_behavior ( | uid VARCHAR, | phoneType VARCHAR, | clickCount INT, | proctime AS PROCTIME(), | `time` TIMESTAMP(3) |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user_behavior', | 'connector.startup-mode' = 'earliest-offset', | 'connector.properties.0.key' = 'zookeeper.connect', | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.1.key' = 'bootstrap.servers', | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) |""".stripMargin) streamTableEnv.sqlUpdate( """ | |insert into user_cnt |SELECT | cast(b.`time` as string), u.age |FROM | user_behavior AS b | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u | ON b.uid = u.uid | |""".stripMargin) 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() 放在select 后面也不行。 在 2020-06-12 15:29:49,"Benchao Li" <[hidden email]> 写道: >你写反了,是proctime AS PROCTIME()。 >计算列跟普通query里面的AS是反着的。 > >Zhou Zach <[hidden email]> 于2020年6月12日周五 下午2:24写道: > >> flink 1.10.0: >> 在create table中,加PROCTIME() AS proctime字段报错 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: >> >Hi, >> > >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >> >可以参考下[1] >> > >> >[1] >> > >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >> > >> >Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: >> > >> >> SLF4J: Class path contains multiple SLF4J bindings. >> >> >> >> SLF4J: Found binding in >> >> >> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> >> >> SLF4J: Found binding in >> >> >> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> >> explanation. >> >> >> >> SLF4J: Actual binding is of type >> >> [org.apache.logging.slf4j.Log4jLoggerFactory] >> >> >> >> ERROR StatusLogger No log4j2 configuration file found. Using default >> >> configuration: logging only errors to the console. >> >> >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> >> Cannot generate a valid execution plan for the given query: >> >> >> >> >> >> >> >> >> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >> >> fields=[time, sum_age]) >> >> >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >> >> >> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >> >> >> >> :- FlinkLogicalCalc(select=[uid, time]) >> >> >> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >> >> default_database, user_behavior, source: [KafkaTableSource(uid, >> phoneType, >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >> >> >> >> +- FlinkLogicalSnapshot(period=[$cor0.time]) >> >> >> >> +- FlinkLogicalCalc(select=[uid, age]) >> >> >> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, >> >> age, created_time)]]], fields=[uid, sex, age, created_time]) >> >> >> >> >> >> >> >> >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left >> >> table's proctime field, doesn't support 'PROCTIME()' >> >> >> >> Please check the documentation for the set of currently supported SQL >> >> features. >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >> >> >> >> at >> >> >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> >> >> >> at >> >> >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> >> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> >> >> at >> >> >> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >> >> >> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >> >> >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >> >> >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >> >> >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> >> >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> >> >> >> at >> >> >> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >> >> >> >> at >> >> >> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >> >> >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table >> join >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >> >> field, doesn't support 'PROCTIME()' >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >> >> >> >> at >> >> >> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >> >> >> >> at >> >> >> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >> >> >> >> at >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >> >> >> >> ... 20 more >> >> >> >> >> >> >> >> >> >> query: >> >> >> >> >> >> val streamExecutionEnv = >> StreamExecutionEnvironment.getExecutionEnvironment >> >> val blinkEnvSettings = >> >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> >> val streamTableEnv = >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >> >> >> >> streamTableEnv.sqlUpdate( >> >> """ >> >> | >> >> |CREATE TABLE user_behavior ( >> >> | uid VARCHAR, >> >> | phoneType VARCHAR, >> >> | clickCount INT, >> >> | `time` TIMESTAMP(3) >> >> |) WITH ( >> >> | 'connector.type' = 'kafka', >> >> | 'connector.version' = 'universal', >> >> | 'connector.topic' = 'user_behavior', >> >> | 'connector.startup-mode' = 'earliest-offset', >> >> | 'connector.properties.0.key' = 'zookeeper.connect', >> >> | 'connector.properties.0.value' = >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> | 'connector.properties.1.key' = 'bootstrap.servers', >> >> | 'connector.properties.1.value' = >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> | 'update-mode' = 'append', >> >> | 'format.type' = 'json', >> >> | 'format.derive-schema' = 'true' >> >> |) >> >> |""".stripMargin) >> >> streamTableEnv.sqlUpdate( >> >> """ >> >> | >> >> |CREATE TABLE user_cnt ( >> >> | `time` VARCHAR, >> >> | sum_age INT >> >> |) WITH ( >> >> | 'connector.type' = 'jdbc', >> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >> >> | 'connector.table' = 'user_cnt', >> >> | 'connector.username' = 'root', >> >> | 'connector.password' = '123456', >> >> | 'connector.write.flush.max-rows' = '1' >> >> |) >> >> |""".stripMargin) >> >> val userTableSource = new MysqlAsyncLookupTableSource( >> >> Array("uid", "sex", "age", "created_time"), >> >> Array(), >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >> >> streamTableEnv.registerTableSource("users", userTableSource) >> >> streamTableEnv.sqlUpdate( >> >> """ >> >> | >> >> |insert into user_cnt >> >> |SELECT >> >> | cast(b.`time` as string), u.age >> >> |FROM >> >> | user_behavior AS b >> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >> >> | ON b.uid = u.uid >> >> | >> >> |""".stripMargin) >> >> streamTableEnv.execute("Temporal table join") >> |
看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
Zhou Zach <[hidden email]> 于2020年6月12日周五 下午3:47写道: > 还是不行, > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > ERROR StatusLogger No log4j2 configuration file found. Using default > configuration: logging only errors to the console. > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > SQL parse failed. Encountered "time FROM" at line 1, column 44. > Was expecting one of: > "CURSOR" ... > "EXISTS" ... > "NOT" ... > "ROW" ... > "(" ... > "+" ... > "-" ... > <UNSIGNED_INTEGER_LITERAL> ... > <DECIMAL_NUMERIC_LITERAL> ... > <APPROX_NUMERIC_LITERAL> ... > <BINARY_STRING_LITERAL> ... > <PREFIXED_STRING_LITERAL> ... > <QUOTED_STRING> ... > <UNICODE_STRING_LITERAL> ... > "TRUE" ... > "FALSE" ... > "UNKNOWN" ... > "NULL" ... > <LBRACE_D> ... > <LBRACE_T> ... > <LBRACE_TS> ... > "DATE" ... > "TIME" <QUOTED_STRING> ... > "TIMESTAMP" ... > "INTERVAL" ... > "?" ... > "CAST" ... > "EXTRACT" ... > "POSITION" ... > "CONVERT" ... > "TRANSLATE" ... > "OVERLAY" ... > "FLOOR" ... > "CEIL" ... > "CEILING" ... > "SUBSTRING" ... > "TRIM" ... > "CLASSIFIER" ... > "MATCH_NUMBER" ... > "RUNNING" ... > "PREV" ... > "NEXT" ... > "JSON_EXISTS" ... > "JSON_VALUE" ... > "JSON_QUERY" ... > "JSON_OBJECT" ... > "JSON_OBJECTAGG" ... > "JSON_ARRAY" ... > "JSON_ARRAYAGG" ... > <LBRACE_FN> ... > "MULTISET" ... > "ARRAY" ... > "MAP" ... > "PERIOD" ... > "SPECIFIC" ... > <IDENTIFIER> ... > <QUOTED_IDENTIFIER> ... > <BACK_QUOTED_IDENTIFIER> ... > <BRACKET_QUOTED_IDENTIFIER> ... > <UNICODE_QUOTED_IDENTIFIER> ... > "ABS" ... > "AVG" ... > "CARDINALITY" ... > "CHAR_LENGTH" ... > "CHARACTER_LENGTH" ... > "COALESCE" ... > "COLLECT" ... > "COVAR_POP" ... > "COVAR_SAMP" ... > "CUME_DIST" ... > "COUNT" ... > "CURRENT_DATE" ... > "CURRENT_TIME" ... > "CURRENT_TIMESTAMP" ... > "DENSE_RANK" ... > "ELEMENT" ... > "EXP" ... > "FIRST_VALUE" ... > "FUSION" ... > "GROUPING" ... > "HOUR" ... > "LAG" ... > "LEAD" ... > "LEFT" ... > "LAST_VALUE" ... > "LN" ... > "LOCALTIME" ... > "LOCALTIMESTAMP" ... > "LOWER" ... > "MAX" ... > "MIN" ... > "MINUTE" ... > "MOD" ... > "MONTH" ... > "NTH_VALUE" ... > "NTILE" ... > "NULLIF" ... > "OCTET_LENGTH" ... > "PERCENT_RANK" ... > "POWER" ... > "RANK" ... > "REGR_COUNT" ... > "REGR_SXX" ... > "REGR_SYY" ... > "RIGHT" ... > "ROW_NUMBER" ... > "SECOND" ... > "SQRT" ... > "STDDEV_POP" ... > "STDDEV_SAMP" ... > "SUM" ... > "UPPER" ... > "TRUNCATE" ... > "USER" ... > "VAR_POP" ... > "VAR_SAMP" ... > "YEAR" ... > "CURRENT_CATALOG" ... > "CURRENT_DEFAULT_TRANSFORM_GROUP" ... > "CURRENT_PATH" ... > "CURRENT_ROLE" ... > "CURRENT_SCHEMA" ... > "CURRENT_USER" ... > "SESSION_USER" ... > "SYSTEM_USER" ... > "NEW" ... > "CASE" ... > "CURRENT" ... > > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) > at > org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) > at > org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) > > > query: > > > streamTableEnv.sqlUpdate( > """ > | > |CREATE TABLE user_behavior ( > | uid VARCHAR, > | phoneType VARCHAR, > | clickCount INT, > | proctime AS PROCTIME(), > | `time` TIMESTAMP(3) > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user_behavior', > | 'connector.startup-mode' = 'earliest-offset', > | 'connector.properties.0.key' = 'zookeeper.connect', > | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.1.key' = 'bootstrap.servers', > | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'update-mode' = 'append', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |) > |""".stripMargin) > streamTableEnv.sqlUpdate( > """ > | > |insert into user_cnt > |SELECT > | cast(b.`time` as string), u.age > |FROM > | user_behavior AS b > | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u > | ON b.uid = u.uid > | > |""".stripMargin) > > > > > > > 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() > 放在select 后面也不行。 > > > > > > > > > 在 2020-06-12 15:29:49,"Benchao Li" <[hidden email]> 写道: > >你写反了,是proctime AS PROCTIME()。 > >计算列跟普通query里面的AS是反着的。 > > > >Zhou Zach <[hidden email]> 于2020年6月12日周五 下午2:24写道: > > > >> flink 1.10.0: > >> 在create table中,加PROCTIME() AS proctime字段报错 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: > >> >Hi, > >> > > >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 > >> >可以参考下[1] > >> > > >> >[1] > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html > >> > > >> >Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: > >> > > >> >> SLF4J: Class path contains multiple SLF4J bindings. > >> >> > >> >> SLF4J: Found binding in > >> >> > >> > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> >> > >> >> SLF4J: Found binding in > >> >> > >> > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> >> > >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > >> >> explanation. > >> >> > >> >> SLF4J: Actual binding is of type > >> >> [org.apache.logging.slf4j.Log4jLoggerFactory] > >> >> > >> >> ERROR StatusLogger No log4j2 configuration file found. Using default > >> >> configuration: logging only errors to the console. > >> >> > >> >> Exception in thread "main" org.apache.flink.table.api.TableException: > >> >> Cannot generate a valid execution plan for the given query: > >> >> > >> >> > >> >> > >> >> > >> >> > FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], > >> >> fields=[time, sum_age]) > >> >> > >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) > >> >> > >> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) > >> >> > >> >> :- FlinkLogicalCalc(select=[uid, time]) > >> >> > >> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > >> >> default_database, user_behavior, source: [KafkaTableSource(uid, > >> phoneType, > >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) > >> >> > >> >> +- FlinkLogicalSnapshot(period=[$cor0.time]) > >> >> > >> >> +- FlinkLogicalCalc(select=[uid, age]) > >> >> > >> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog, > >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, > sex, > >> >> age, created_time)]]], fields=[uid, sex, age, created_time]) > >> >> > >> >> > >> >> > >> >> > >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' > left > >> >> table's proctime field, doesn't support 'PROCTIME()' > >> >> > >> >> Please check the documentation for the set of currently supported SQL > >> >> features. > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > >> >> > >> >> at > >> >> > >> > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > >> >> > >> >> at > >> >> > >> > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > >> >> > >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> > >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> > >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> > >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> > >> >> at > >> >> > >> > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > >> >> > >> >> at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > >> >> > >> >> at > >> >> > >> > org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) > >> >> > >> >> at > >> >> > >> > org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) > >> >> > >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table > >> join > >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime > >> >> field, doesn't support 'PROCTIME()' > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) > >> >> > >> >> at > >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > >> >> > >> >> ... 20 more > >> >> > >> >> > >> >> > >> >> > >> >> query: > >> >> > >> >> > >> >> val streamExecutionEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment > >> >> val blinkEnvSettings = > >> >> > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> >> val streamTableEnv = > >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) > >> >> > >> >> streamTableEnv.sqlUpdate( > >> >> """ > >> >> | > >> >> |CREATE TABLE user_behavior ( > >> >> | uid VARCHAR, > >> >> | phoneType VARCHAR, > >> >> | clickCount INT, > >> >> | `time` TIMESTAMP(3) > >> >> |) WITH ( > >> >> | 'connector.type' = 'kafka', > >> >> | 'connector.version' = 'universal', > >> >> | 'connector.topic' = 'user_behavior', > >> >> | 'connector.startup-mode' = 'earliest-offset', > >> >> | 'connector.properties.0.key' = 'zookeeper.connect', > >> >> | 'connector.properties.0.value' = > >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> | 'connector.properties.1.key' = 'bootstrap.servers', > >> >> | 'connector.properties.1.value' = > >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> | 'update-mode' = 'append', > >> >> | 'format.type' = 'json', > >> >> | 'format.derive-schema' = 'true' > >> >> |) > >> >> |""".stripMargin) > >> >> streamTableEnv.sqlUpdate( > >> >> """ > >> >> | > >> >> |CREATE TABLE user_cnt ( > >> >> | `time` VARCHAR, > >> >> | sum_age INT > >> >> |) WITH ( > >> >> | 'connector.type' = 'jdbc', > >> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', > >> >> | 'connector.table' = 'user_cnt', > >> >> | 'connector.username' = 'root', > >> >> | 'connector.password' = '123456', > >> >> | 'connector.write.flush.max-rows' = '1' > >> >> |) > >> >> |""".stripMargin) > >> >> val userTableSource = new MysqlAsyncLookupTableSource( > >> >> Array("uid", "sex", "age", "created_time"), > >> >> Array(), > >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) > >> >> streamTableEnv.registerTableSource("users", userTableSource) > >> >> streamTableEnv.sqlUpdate( > >> >> """ > >> >> | > >> >> |insert into user_cnt > >> >> |SELECT > >> >> | cast(b.`time` as string), u.age > >> >> |FROM > >> >> | user_behavior AS b > >> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u > >> >> | ON b.uid = u.uid > >> >> | > >> >> |""".stripMargin) > >> >> streamTableEnv.execute("Temporal table join") > >> > |
是的,1.10.0版本 在 2020-06-12 16:28:15,"Benchao Li" <[hidden email]> 写道: >看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 > >Zhou Zach <[hidden email]> 于2020年6月12日周五 下午3:47写道: > >> 还是不行, >> SLF4J: Class path contains multiple SLF4J bindings. >> SLF4J: Found binding in >> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: Found binding in >> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> explanation. >> SLF4J: Actual binding is of type >> [org.apache.logging.slf4j.Log4jLoggerFactory] >> ERROR StatusLogger No log4j2 configuration file found. Using default >> configuration: logging only errors to the console. >> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >> SQL parse failed. Encountered "time FROM" at line 1, column 44. >> Was expecting one of: >> "CURSOR" ... >> "EXISTS" ... >> "NOT" ... >> "ROW" ... >> "(" ... >> "+" ... >> "-" ... >> <UNSIGNED_INTEGER_LITERAL> ... >> <DECIMAL_NUMERIC_LITERAL> ... >> <APPROX_NUMERIC_LITERAL> ... >> <BINARY_STRING_LITERAL> ... >> <PREFIXED_STRING_LITERAL> ... >> <QUOTED_STRING> ... >> <UNICODE_STRING_LITERAL> ... >> "TRUE" ... >> "FALSE" ... >> "UNKNOWN" ... >> "NULL" ... >> <LBRACE_D> ... >> <LBRACE_T> ... >> <LBRACE_TS> ... >> "DATE" ... >> "TIME" <QUOTED_STRING> ... >> "TIMESTAMP" ... >> "INTERVAL" ... >> "?" ... >> "CAST" ... >> "EXTRACT" ... >> "POSITION" ... >> "CONVERT" ... >> "TRANSLATE" ... >> "OVERLAY" ... >> "FLOOR" ... >> "CEIL" ... >> "CEILING" ... >> "SUBSTRING" ... >> "TRIM" ... >> "CLASSIFIER" ... >> "MATCH_NUMBER" ... >> "RUNNING" ... >> "PREV" ... >> "NEXT" ... >> "JSON_EXISTS" ... >> "JSON_VALUE" ... >> "JSON_QUERY" ... >> "JSON_OBJECT" ... >> "JSON_OBJECTAGG" ... >> "JSON_ARRAY" ... >> "JSON_ARRAYAGG" ... >> <LBRACE_FN> ... >> "MULTISET" ... >> "ARRAY" ... >> "MAP" ... >> "PERIOD" ... >> "SPECIFIC" ... >> <IDENTIFIER> ... >> <QUOTED_IDENTIFIER> ... >> <BACK_QUOTED_IDENTIFIER> ... >> <BRACKET_QUOTED_IDENTIFIER> ... >> <UNICODE_QUOTED_IDENTIFIER> ... >> "ABS" ... >> "AVG" ... >> "CARDINALITY" ... >> "CHAR_LENGTH" ... >> "CHARACTER_LENGTH" ... >> "COALESCE" ... >> "COLLECT" ... >> "COVAR_POP" ... >> "COVAR_SAMP" ... >> "CUME_DIST" ... >> "COUNT" ... >> "CURRENT_DATE" ... >> "CURRENT_TIME" ... >> "CURRENT_TIMESTAMP" ... >> "DENSE_RANK" ... >> "ELEMENT" ... >> "EXP" ... >> "FIRST_VALUE" ... >> "FUSION" ... >> "GROUPING" ... >> "HOUR" ... >> "LAG" ... >> "LEAD" ... >> "LEFT" ... >> "LAST_VALUE" ... >> "LN" ... >> "LOCALTIME" ... >> "LOCALTIMESTAMP" ... >> "LOWER" ... >> "MAX" ... >> "MIN" ... >> "MINUTE" ... >> "MOD" ... >> "MONTH" ... >> "NTH_VALUE" ... >> "NTILE" ... >> "NULLIF" ... >> "OCTET_LENGTH" ... >> "PERCENT_RANK" ... >> "POWER" ... >> "RANK" ... >> "REGR_COUNT" ... >> "REGR_SXX" ... >> "REGR_SYY" ... >> "RIGHT" ... >> "ROW_NUMBER" ... >> "SECOND" ... >> "SQRT" ... >> "STDDEV_POP" ... >> "STDDEV_SAMP" ... >> "SUM" ... >> "UPPER" ... >> "TRUNCATE" ... >> "USER" ... >> "VAR_POP" ... >> "VAR_SAMP" ... >> "YEAR" ... >> "CURRENT_CATALOG" ... >> "CURRENT_DEFAULT_TRANSFORM_GROUP" ... >> "CURRENT_PATH" ... >> "CURRENT_ROLE" ... >> "CURRENT_SCHEMA" ... >> "CURRENT_USER" ... >> "SESSION_USER" ... >> "SYSTEM_USER" ... >> "NEW" ... >> "CASE" ... >> "CURRENT" ... >> >> at >> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >> at >> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >> at >> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) >> at >> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >> >> >> query: >> >> >> streamTableEnv.sqlUpdate( >> """ >> | >> |CREATE TABLE user_behavior ( >> | uid VARCHAR, >> | phoneType VARCHAR, >> | clickCount INT, >> | proctime AS PROCTIME(), >> | `time` TIMESTAMP(3) >> |) WITH ( >> | 'connector.type' = 'kafka', >> | 'connector.version' = 'universal', >> | 'connector.topic' = 'user_behavior', >> | 'connector.startup-mode' = 'earliest-offset', >> | 'connector.properties.0.key' = 'zookeeper.connect', >> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >> | 'connector.properties.1.key' = 'bootstrap.servers', >> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >> | 'update-mode' = 'append', >> | 'format.type' = 'json', >> | 'format.derive-schema' = 'true' >> |) >> |""".stripMargin) >> streamTableEnv.sqlUpdate( >> """ >> | >> |insert into user_cnt >> |SELECT >> | cast(b.`time` as string), u.age >> |FROM >> | user_behavior AS b >> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u >> | ON b.uid = u.uid >> | >> |""".stripMargin) >> >> >> >> >> >> >> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() >> 放在select 后面也不行。 >> >> >> >> >> >> >> >> >> 在 2020-06-12 15:29:49,"Benchao Li" <[hidden email]> 写道: >> >你写反了,是proctime AS PROCTIME()。 >> >计算列跟普通query里面的AS是反着的。 >> > >> >Zhou Zach <[hidden email]> 于2020年6月12日周五 下午2:24写道: >> > >> >> flink 1.10.0: >> >> 在create table中,加PROCTIME() AS proctime字段报错 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: >> >> >Hi, >> >> > >> >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >> >> >可以参考下[1] >> >> > >> >> >[1] >> >> > >> >> >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >> >> > >> >> >Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: >> >> > >> >> >> SLF4J: Class path contains multiple SLF4J bindings. >> >> >> >> >> >> SLF4J: Found binding in >> >> >> >> >> >> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> >> >> >> >> SLF4J: Found binding in >> >> >> >> >> >> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> >> >> >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> >> >> explanation. >> >> >> >> >> >> SLF4J: Actual binding is of type >> >> >> [org.apache.logging.slf4j.Log4jLoggerFactory] >> >> >> >> >> >> ERROR StatusLogger No log4j2 configuration file found. Using default >> >> >> configuration: logging only errors to the console. >> >> >> >> >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> >> >> Cannot generate a valid execution plan for the given query: >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >> >> >> fields=[time, sum_age]) >> >> >> >> >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >> >> >> >> >> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >> >> >> >> >> >> :- FlinkLogicalCalc(select=[uid, time]) >> >> >> >> >> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >> >> >> default_database, user_behavior, source: [KafkaTableSource(uid, >> >> phoneType, >> >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >> >> >> >> >> >> +- FlinkLogicalSnapshot(period=[$cor0.time]) >> >> >> >> >> >> +- FlinkLogicalCalc(select=[uid, age]) >> >> >> >> >> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >> >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, >> sex, >> >> >> age, created_time)]]], fields=[uid, sex, age, created_time]) >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' >> left >> >> >> table's proctime field, doesn't support 'PROCTIME()' >> >> >> >> >> >> Please check the documentation for the set of currently supported SQL >> >> >> features. >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >> >> >> >> >> >> at >> >> >> >> >> >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> >> >> >> >> >> at >> >> >> >> >> >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> >> >> >> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> >> >> >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> >> >> >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> >> >> >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> >> >> >> >> at >> >> >> >> >> >> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >> >> >> >> >> >> at >> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> >> >> >> >> >> at >> >> >> >> >> >> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >> >> >> >> >> >> at >> >> >> >> >> >> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >> >> >> >> >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table >> >> join >> >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >> >> >> field, doesn't support 'PROCTIME()' >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >> >> >> >> >> >> at >> >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >> >> >> >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >> >> >> >> >> >> ... 20 more >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> query: >> >> >> >> >> >> >> >> >> val streamExecutionEnv = >> >> StreamExecutionEnvironment.getExecutionEnvironment >> >> >> val blinkEnvSettings = >> >> >> >> >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> >> >> val streamTableEnv = >> >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >> >> >> >> >> >> streamTableEnv.sqlUpdate( >> >> >> """ >> >> >> | >> >> >> |CREATE TABLE user_behavior ( >> >> >> | uid VARCHAR, >> >> >> | phoneType VARCHAR, >> >> >> | clickCount INT, >> >> >> | `time` TIMESTAMP(3) >> >> >> |) WITH ( >> >> >> | 'connector.type' = 'kafka', >> >> >> | 'connector.version' = 'universal', >> >> >> | 'connector.topic' = 'user_behavior', >> >> >> | 'connector.startup-mode' = 'earliest-offset', >> >> >> | 'connector.properties.0.key' = 'zookeeper.connect', >> >> >> | 'connector.properties.0.value' = >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> >> | 'connector.properties.1.key' = 'bootstrap.servers', >> >> >> | 'connector.properties.1.value' = >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> >> | 'update-mode' = 'append', >> >> >> | 'format.type' = 'json', >> >> >> | 'format.derive-schema' = 'true' >> >> >> |) >> >> >> |""".stripMargin) >> >> >> streamTableEnv.sqlUpdate( >> >> >> """ >> >> >> | >> >> >> |CREATE TABLE user_cnt ( >> >> >> | `time` VARCHAR, >> >> >> | sum_age INT >> >> >> |) WITH ( >> >> >> | 'connector.type' = 'jdbc', >> >> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >> >> >> | 'connector.table' = 'user_cnt', >> >> >> | 'connector.username' = 'root', >> >> >> | 'connector.password' = '123456', >> >> >> | 'connector.write.flush.max-rows' = '1' >> >> >> |) >> >> >> |""".stripMargin) >> >> >> val userTableSource = new MysqlAsyncLookupTableSource( >> >> >> Array("uid", "sex", "age", "created_time"), >> >> >> Array(), >> >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >> >> >> streamTableEnv.registerTableSource("users", userTableSource) >> >> >> streamTableEnv.sqlUpdate( >> >> >> """ >> >> >> | >> >> >> |insert into user_cnt >> >> >> |SELECT >> >> >> | cast(b.`time` as string), u.age >> >> >> |FROM >> >> >> | user_behavior AS b >> >> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >> >> >> | ON b.uid = u.uid >> >> >> | >> >> >> |""".stripMargin) >> >> >> streamTableEnv.execute("Temporal table join") >> >> >> |
你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。 祝好 Leonard Xu > 在 2020年6月12日,17:38,Zhou Zach <[hidden email]> 写道: > > > > > 是的,1.10.0版本 > > > > > > > > > 在 2020-06-12 16:28:15,"Benchao Li" <[hidden email]> 写道: >> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 >> >> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午3:47写道: >> >>> 还是不行, >>> SLF4J: Class path contains multiple SLF4J bindings. >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> SLF4J: Actual binding is of type >>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>> ERROR StatusLogger No log4j2 configuration file found. Using default >>> configuration: logging only errors to the console. >>> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >>> SQL parse failed. Encountered "time FROM" at line 1, column 44. >>> Was expecting one of: >>> "CURSOR" ... >>> "EXISTS" ... >>> "NOT" ... >>> "ROW" ... >>> "(" ... >>> "+" ... >>> "-" ... >>> <UNSIGNED_INTEGER_LITERAL> ... >>> <DECIMAL_NUMERIC_LITERAL> ... >>> <APPROX_NUMERIC_LITERAL> ... >>> <BINARY_STRING_LITERAL> ... >>> <PREFIXED_STRING_LITERAL> ... >>> <QUOTED_STRING> ... >>> <UNICODE_STRING_LITERAL> ... >>> "TRUE" ... >>> "FALSE" ... >>> "UNKNOWN" ... >>> "NULL" ... >>> <LBRACE_D> ... >>> <LBRACE_T> ... >>> <LBRACE_TS> ... >>> "DATE" ... >>> "TIME" <QUOTED_STRING> ... >>> "TIMESTAMP" ... >>> "INTERVAL" ... >>> "?" ... >>> "CAST" ... >>> "EXTRACT" ... >>> "POSITION" ... >>> "CONVERT" ... >>> "TRANSLATE" ... >>> "OVERLAY" ... >>> "FLOOR" ... >>> "CEIL" ... >>> "CEILING" ... >>> "SUBSTRING" ... >>> "TRIM" ... >>> "CLASSIFIER" ... >>> "MATCH_NUMBER" ... >>> "RUNNING" ... >>> "PREV" ... >>> "NEXT" ... >>> "JSON_EXISTS" ... >>> "JSON_VALUE" ... >>> "JSON_QUERY" ... >>> "JSON_OBJECT" ... >>> "JSON_OBJECTAGG" ... >>> "JSON_ARRAY" ... >>> "JSON_ARRAYAGG" ... >>> <LBRACE_FN> ... >>> "MULTISET" ... >>> "ARRAY" ... >>> "MAP" ... >>> "PERIOD" ... >>> "SPECIFIC" ... >>> <IDENTIFIER> ... >>> <QUOTED_IDENTIFIER> ... >>> <BACK_QUOTED_IDENTIFIER> ... >>> <BRACKET_QUOTED_IDENTIFIER> ... >>> <UNICODE_QUOTED_IDENTIFIER> ... >>> "ABS" ... >>> "AVG" ... >>> "CARDINALITY" ... >>> "CHAR_LENGTH" ... >>> "CHARACTER_LENGTH" ... >>> "COALESCE" ... >>> "COLLECT" ... >>> "COVAR_POP" ... >>> "COVAR_SAMP" ... >>> "CUME_DIST" ... >>> "COUNT" ... >>> "CURRENT_DATE" ... >>> "CURRENT_TIME" ... >>> "CURRENT_TIMESTAMP" ... >>> "DENSE_RANK" ... >>> "ELEMENT" ... >>> "EXP" ... >>> "FIRST_VALUE" ... >>> "FUSION" ... >>> "GROUPING" ... >>> "HOUR" ... >>> "LAG" ... >>> "LEAD" ... >>> "LEFT" ... >>> "LAST_VALUE" ... >>> "LN" ... >>> "LOCALTIME" ... >>> "LOCALTIMESTAMP" ... >>> "LOWER" ... >>> "MAX" ... >>> "MIN" ... >>> "MINUTE" ... >>> "MOD" ... >>> "MONTH" ... >>> "NTH_VALUE" ... >>> "NTILE" ... >>> "NULLIF" ... >>> "OCTET_LENGTH" ... >>> "PERCENT_RANK" ... >>> "POWER" ... >>> "RANK" ... >>> "REGR_COUNT" ... >>> "REGR_SXX" ... >>> "REGR_SYY" ... >>> "RIGHT" ... >>> "ROW_NUMBER" ... >>> "SECOND" ... >>> "SQRT" ... >>> "STDDEV_POP" ... >>> "STDDEV_SAMP" ... >>> "SUM" ... >>> "UPPER" ... >>> "TRUNCATE" ... >>> "USER" ... >>> "VAR_POP" ... >>> "VAR_SAMP" ... >>> "YEAR" ... >>> "CURRENT_CATALOG" ... >>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ... >>> "CURRENT_PATH" ... >>> "CURRENT_ROLE" ... >>> "CURRENT_SCHEMA" ... >>> "CURRENT_USER" ... >>> "SESSION_USER" ... >>> "SYSTEM_USER" ... >>> "NEW" ... >>> "CASE" ... >>> "CURRENT" ... >>> >>> at >>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >>> at >>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >>> at >>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>> >>> >>> query: >>> >>> >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |CREATE TABLE user_behavior ( >>> | uid VARCHAR, >>> | phoneType VARCHAR, >>> | clickCount INT, >>> | proctime AS PROCTIME(), >>> | `time` TIMESTAMP(3) >>> |) WITH ( >>> | 'connector.type' = 'kafka', >>> | 'connector.version' = 'universal', >>> | 'connector.topic' = 'user_behavior', >>> | 'connector.startup-mode' = 'earliest-offset', >>> | 'connector.properties.0.key' = 'zookeeper.connect', >>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>> | 'connector.properties.1.key' = 'bootstrap.servers', >>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>> | 'update-mode' = 'append', >>> | 'format.type' = 'json', >>> | 'format.derive-schema' = 'true' >>> |) >>> |""".stripMargin) >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |insert into user_cnt >>> |SELECT >>> | cast(b.`time` as string), u.age >>> |FROM >>> | user_behavior AS b >>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u >>> | ON b.uid = u.uid >>> | >>> |""".stripMargin) >>> >>> >>> >>> >>> >>> >>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() >>> 放在select 后面也不行。 >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-06-12 15:29:49,"Benchao Li" <[hidden email]> 写道: >>>> 你写反了,是proctime AS PROCTIME()。 >>>> 计算列跟普通query里面的AS是反着的。 >>>> >>>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午2:24写道: >>>> >>>>> flink 1.10.0: >>>>> 在create table中,加PROCTIME() AS proctime字段报错 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: >>>>>> Hi, >>>>>> >>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >>>>>> 可以参考下[1] >>>>>> >>>>>> [1] >>>>>> >>>>> >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >>>>>> >>>>>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: >>>>>> >>>>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>>>> >>>>>>> SLF4J: Found binding in >>>>>>> >>>>> >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>> >>>>>>> SLF4J: Found binding in >>>>>>> >>>>> >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>> >>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>>>> explanation. >>>>>>> >>>>>>> SLF4J: Actual binding is of type >>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>>>>>> >>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default >>>>>>> configuration: logging only errors to the console. >>>>>>> >>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>>> Cannot generate a valid execution plan for the given query: >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>>>>>> fields=[time, sum_age]) >>>>>>> >>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >>>>>>> >>>>>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >>>>>>> >>>>>>> :- FlinkLogicalCalc(select=[uid, time]) >>>>>>> >>>>>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid, >>>>> phoneType, >>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >>>>>>> >>>>>>> +- FlinkLogicalSnapshot(period=[$cor0.time]) >>>>>>> >>>>>>> +- FlinkLogicalCalc(select=[uid, age]) >>>>>>> >>>>>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, >>> sex, >>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time]) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' >>> left >>>>>>> table's proctime field, doesn't support 'PROCTIME()' >>>>>>> >>>>>>> Please check the documentation for the set of currently supported SQL >>>>>>> features. >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>> >>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>> >>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>> >>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>>>>>> >>>>>>> at >>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>>>>>> >>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table >>>>> join >>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >>>>>>> field, doesn't support 'PROCTIME()' >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >>>>>>> >>>>>>> at >>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >>>>>>> >>>>>>> ... 20 more >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> query: >>>>>>> >>>>>>> >>>>>>> val streamExecutionEnv = >>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> val blinkEnvSettings = >>>>>>> >>>>> >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>>> val streamTableEnv = >>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >>>>>>> >>>>>>> streamTableEnv.sqlUpdate( >>>>>>> """ >>>>>>> | >>>>>>> |CREATE TABLE user_behavior ( >>>>>>> | uid VARCHAR, >>>>>>> | phoneType VARCHAR, >>>>>>> | clickCount INT, >>>>>>> | `time` TIMESTAMP(3) >>>>>>> |) WITH ( >>>>>>> | 'connector.type' = 'kafka', >>>>>>> | 'connector.version' = 'universal', >>>>>>> | 'connector.topic' = 'user_behavior', >>>>>>> | 'connector.startup-mode' = 'earliest-offset', >>>>>>> | 'connector.properties.0.key' = 'zookeeper.connect', >>>>>>> | 'connector.properties.0.value' = >>>>> 'cdh1:2181,cdh2:2181,cdh3:2181', >>>>>>> | 'connector.properties.1.key' = 'bootstrap.servers', >>>>>>> | 'connector.properties.1.value' = >>>>> 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>>>> | 'update-mode' = 'append', >>>>>>> | 'format.type' = 'json', >>>>>>> | 'format.derive-schema' = 'true' >>>>>>> |) >>>>>>> |""".stripMargin) >>>>>>> streamTableEnv.sqlUpdate( >>>>>>> """ >>>>>>> | >>>>>>> |CREATE TABLE user_cnt ( >>>>>>> | `time` VARCHAR, >>>>>>> | sum_age INT >>>>>>> |) WITH ( >>>>>>> | 'connector.type' = 'jdbc', >>>>>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >>>>>>> | 'connector.table' = 'user_cnt', >>>>>>> | 'connector.username' = 'root', >>>>>>> | 'connector.password' = '123456', >>>>>>> | 'connector.write.flush.max-rows' = '1' >>>>>>> |) >>>>>>> |""".stripMargin) >>>>>>> val userTableSource = new MysqlAsyncLookupTableSource( >>>>>>> Array("uid", "sex", "age", "created_time"), >>>>>>> Array(), >>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >>>>>>> streamTableEnv.registerTableSource("users", userTableSource) >>>>>>> streamTableEnv.sqlUpdate( >>>>>>> """ >>>>>>> | >>>>>>> |insert into user_cnt >>>>>>> |SELECT >>>>>>> | cast(b.`time` as string), u.age >>>>>>> |FROM >>>>>>> | user_behavior AS b >>>>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >>>>>>> | ON b.uid = u.uid >>>>>>> | >>>>>>> |""".stripMargin) >>>>>>> streamTableEnv.execute("Temporal table join") >>>>> >>> |
可以参考之前的邮件列表
https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E ------------------ 原始邮件 ------------------ 发件人: "Leonard Xu"<[hidden email]>; 发送时间: 2020年6月12日(星期五) 下午5:43 收件人: "user-zh"<[hidden email]>; 主题: Re: flink sql Temporal table join failed 你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。 祝好 Leonard Xu > 在 2020年6月12日,17:38,Zhou Zach <[hidden email]> 写道: > > > > > 是的,1.10.0版本 > > > > > > > > > 在 2020-06-12 16:28:15,"Benchao Li" <[hidden email]> 写道: >> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 >> >> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午3:47写道: >> >>> 还是不行, >>> SLF4J: Class path contains multiple SLF4J bindings. >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> SLF4J: Actual binding is of type >>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>> ERROR StatusLogger No log4j2 configuration file found. Using default >>> configuration: logging only errors to the console. >>> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >>> SQL parse failed. Encountered "time FROM" at line 1, column 44. >>> Was expecting one of: >>> "CURSOR" ... >>> "EXISTS" ... >>> "NOT" ... >>> "ROW" ... >>> "(" ... >>> "+" ... >>> "-" ... >>> <UNSIGNED_INTEGER_LITERAL> ... >>> <DECIMAL_NUMERIC_LITERAL> ... >>> <APPROX_NUMERIC_LITERAL> ... >>> <BINARY_STRING_LITERAL> ... >>> <PREFIXED_STRING_LITERAL> ... >>> <QUOTED_STRING> ... >>> <UNICODE_STRING_LITERAL> ... >>> "TRUE" ... >>> "FALSE" ... >>> "UNKNOWN" ... >>> "NULL" ... >>> <LBRACE_D> ... >>> <LBRACE_T> ... >>> <LBRACE_TS> ... >>> "DATE" ... >>> "TIME" <QUOTED_STRING> ... >>> "TIMESTAMP" ... >>> "INTERVAL" ... >>> "?" ... >>> "CAST" ... >>> "EXTRACT" ... >>> "POSITION" ... >>> "CONVERT" ... >>> "TRANSLATE" ... >>> "OVERLAY" ... >>> "FLOOR" ... >>> "CEIL" ... >>> "CEILING" ... >>> "SUBSTRING" ... >>> "TRIM" ... >>> "CLASSIFIER" ... >>> "MATCH_NUMBER" ... >>> "RUNNING" ... >>> "PREV" ... >>> "NEXT" ... >>> "JSON_EXISTS" ... >>> "JSON_VALUE" ... >>> "JSON_QUERY" ... >>> "JSON_OBJECT" ... >>> "JSON_OBJECTAGG" ... >>> "JSON_ARRAY" ... >>> "JSON_ARRAYAGG" ... >>> <LBRACE_FN> ... >>> "MULTISET" ... >>> "ARRAY" ... >>> "MAP" ... >>> "PERIOD" ... >>> "SPECIFIC" ... >>> <IDENTIFIER> ... >>> <QUOTED_IDENTIFIER> ... >>> <BACK_QUOTED_IDENTIFIER> ... >>> <BRACKET_QUOTED_IDENTIFIER> ... >>> <UNICODE_QUOTED_IDENTIFIER> ... >>> "ABS" ... >>> "AVG" ... >>> "CARDINALITY" ... >>> "CHAR_LENGTH" ... >>> "CHARACTER_LENGTH" ... >>> "COALESCE" ... >>> "COLLECT" ... >>> "COVAR_POP" ... >>> "COVAR_SAMP" ... >>> "CUME_DIST" ... >>> "COUNT" ... >>> "CURRENT_DATE" ... >>> "CURRENT_TIME" ... >>> "CURRENT_TIMESTAMP" ... >>> "DENSE_RANK" ... >>> "ELEMENT" ... >>> "EXP" ... >>> "FIRST_VALUE" ... >>> "FUSION" ... >>> "GROUPING" ... >>> "HOUR" ... >>> "LAG" ... >>> "LEAD" ... >>> "LEFT" ... >>> "LAST_VALUE" ... >>> "LN" ... >>> "LOCALTIME" ... >>> "LOCALTIMESTAMP" ... >>> "LOWER" ... >>> "MAX" ... >>> "MIN" ... >>> "MINUTE" ... >>> "MOD" ... >>> "MONTH" ... >>> "NTH_VALUE" ... >>> "NTILE" ... >>> "NULLIF" ... >>> "OCTET_LENGTH" ... >>> "PERCENT_RANK" ... >>> "POWER" ... >>> "RANK" ... >>> "REGR_COUNT" ... >>> "REGR_SXX" ... >>> "REGR_SYY" ... >>> "RIGHT" ... >>> "ROW_NUMBER" ... >>> "SECOND" ... >>> "SQRT" ... >>> "STDDEV_POP" ... >>> "STDDEV_SAMP" ... >>> "SUM" ... >>> "UPPER" ... >>> "TRUNCATE" ... >>> "USER" ... >>> "VAR_POP" ... >>> "VAR_SAMP" ... >>> "YEAR" ... >>> "CURRENT_CATALOG" ... >>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ... >>> "CURRENT_PATH" ... >>> "CURRENT_ROLE" ... >>> "CURRENT_SCHEMA" ... >>> "CURRENT_USER" ... >>> "SESSION_USER" ... >>> "SYSTEM_USER" ... >>> "NEW" ... >>> "CASE" ... >>> "CURRENT" ... >>> >>> at >>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >>> at >>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >>> at >>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>> >>> >>> query: >>> >>> >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |CREATE TABLE user_behavior ( >>> | uid VARCHAR, >>> | phoneType VARCHAR, >>> | clickCount INT, >>> | proctime AS PROCTIME(), >>> | `time` TIMESTAMP(3) >>> |) WITH ( >>> | 'connector.type' = 'kafka', >>> | 'connector.version' = 'universal', >>> | 'connector.topic' = 'user_behavior', >>> | 'connector.startup-mode' = 'earliest-offset', >>> | 'connector.properties.0.key' = 'zookeeper.connect', >>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>> | 'connector.properties.1.key' = 'bootstrap.servers', >>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>> | 'update-mode' = 'append', >>> | 'format.type' = 'json', >>> | 'format.derive-schema' = 'true' >>> |) >>> |""".stripMargin) >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |insert into user_cnt >>> |SELECT >>> | cast(b.`time` as string), u.age >>> |FROM >>> | user_behavior AS b >>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u >>> | ON b.uid = u.uid >>> | >>> |""".stripMargin) >>> >>> >>> >>> >>> >>> >>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() >>> 放在select 后面也不行。 >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-06-12 15:29:49,"Benchao Li" <[hidden email]> 写道: >>>> 你写反了,是proctime AS PROCTIME()。 >>>> 计算列跟普通query里面的AS是反着的。 >>>> >>>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午2:24写道: >>>> >>>>> flink 1.10.0: >>>>> 在create table中,加PROCTIME() AS proctime字段报错 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: >>>>>> Hi, >>>>>> >>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >>>>>> 可以参考下[1] >>>>>> >>>>>> [1] >>>>>> >>>>> >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >>>>>> >>>>>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: >>>>>> >>>>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>>>> >>>>>>> SLF4J: Found binding in >>>>>>> >>>>> >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>> >>>>>>> SLF4J: Found binding in >>>>>>> >>>>> >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>> >>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>>>> explanation. >>>>>>> >>>>>>> SLF4J: Actual binding is of type >>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>>>>>> >>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default >>>>>>> configuration: logging only errors to the console. >>>>>>> >>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>>> Cannot generate a valid execution plan for the given query: >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>>>>>> fields=[time, sum_age]) >>>>>>> >>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >>>>>>> >>>>>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >>>>>>> >>>>>>> :- FlinkLogicalCalc(select=[uid, time]) >>>>>>> >>>>>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid, >>>>> phoneType, >>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >>>>>>> >>>>>>> +- FlinkLogicalSnapshot(period=[$cor0.time]) >>>>>>> >>>>>>> +- FlinkLogicalCalc(select=[uid, age]) >>>>>>> >>>>>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, >>> sex, >>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time]) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' >>> left >>>>>>> table's proctime field, doesn't support 'PROCTIME()' >>>>>>> >>>>>>> Please check the documentation for the set of currently supported SQL >>>>>>> features. >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>> >>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>> >>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>> >>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>>>>>> >>>>>>> at >>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>>>>>> >>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table >>>>> join >>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >>>>>>> field, doesn't support 'PROCTIME()' >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >>>>>>> >>>>>>> at >>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >>>>>>> >>>>>>> ... 20 more >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> query: >>>>>>> >>>>>>> >>>>>>> val streamExecutionEnv = >>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> val blinkEnvSettings = >>>>>>> >>>>> >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>>> val streamTableEnv = >>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >>>>>>> >>>>>>> streamTableEnv.sqlUpdate( >>>>>>> """ >>>>>>> | >>>>>>> |CREATE TABLE user_behavior ( >>>>>>> | uid VARCHAR, >>>>>>> | phoneType VARCHAR, >>>>>>> | clickCount INT, >>>>>>> | `time` TIMESTAMP(3) >>>>>>> |) WITH ( >>>>>>> | 'connector.type' = 'kafka', >>>>>>> | 'connector.version' = 'universal', >>>>>>> | 'connector.topic' = 'user_behavior', >>>>>>> | 'connector.startup-mode' = 'earliest-offset', >>>>>>> | 'connector.properties.0.key' = 'zookeeper.connect', >>>>>>> | 'connector.properties.0.value' = >>>>> 'cdh1:2181,cdh2:2181,cdh3:2181', >>>>>>> | 'connector.properties.1.key' = 'bootstrap.servers', >>>>>>> | 'connector.properties.1.value' = >>>>> 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>>>> | 'update-mode' = 'append', >>>>>>> | 'format.type' = 'json', >>>>>>> | 'format.derive-schema' = 'true' >>>>>>> |) >>>>>>> |""".stripMargin) >>>>>>> streamTableEnv.sqlUpdate( >>>>>>> """ >>>>>>> | >>>>>>> |CREATE TABLE user_cnt ( >>>>>>> | `time` VARCHAR, >>>>>>> | sum_age INT >>>>>>> |) WITH ( >>>>>>> | 'connector.type' = 'jdbc', >>>>>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >>>>>>> | 'connector.table' = 'user_cnt', >>>>>>> | 'connector.username' = 'root', >>>>>>> | 'connector.password' = '123456', >>>>>>> | 'connector.write.flush.max-rows' = '1' >>>>>>> |) >>>>>>> |""".stripMargin) >>>>>>> val userTableSource = new MysqlAsyncLookupTableSource( >>>>>>> Array("uid", "sex", "age", "created_time"), >>>>>>> Array(), >>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >>>>>>> streamTableEnv.registerTableSource("users", userTableSource) >>>>>>> streamTableEnv.sqlUpdate( >>>>>>> """ >>>>>>> | >>>>>>> |insert into user_cnt >>>>>>> |SELECT >>>>>>> | cast(b.`time` as string), u.age >>>>>>> |FROM >>>>>>> | user_behavior AS b >>>>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >>>>>>> | ON b.uid = u.uid >>>>>>> | >>>>>>> |""".stripMargin) >>>>>>> streamTableEnv.execute("Temporal table join") >>>>> >>> |
In reply to this post by Leonard Xu
感谢提醒
在 2020-06-12 17:43:20,"Leonard Xu" <[hidden email]> 写道: > >你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。 > >祝好 >Leonard Xu > >> 在 2020年6月12日,17:38,Zhou Zach <[hidden email]> 写道: >> >> >> >> >> 是的,1.10.0版本 >> >> >> >> >> >> >> >> >> 在 2020-06-12 16:28:15,"Benchao Li" <[hidden email]> 写道: >>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 >>> >>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午3:47写道: >>> >>>> 还是不行, >>>> SLF4J: Class path contains multiple SLF4J bindings. >>>> SLF4J: Found binding in >>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>> SLF4J: Found binding in >>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>> explanation. >>>> SLF4J: Actual binding is of type >>>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>>> ERROR StatusLogger No log4j2 configuration file found. Using default >>>> configuration: logging only errors to the console. >>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >>>> SQL parse failed. Encountered "time FROM" at line 1, column 44. >>>> Was expecting one of: >>>> "CURSOR" ... >>>> "EXISTS" ... >>>> "NOT" ... >>>> "ROW" ... >>>> "(" ... >>>> "+" ... >>>> "-" ... >>>> <UNSIGNED_INTEGER_LITERAL> ... >>>> <DECIMAL_NUMERIC_LITERAL> ... >>>> <APPROX_NUMERIC_LITERAL> ... >>>> <BINARY_STRING_LITERAL> ... >>>> <PREFIXED_STRING_LITERAL> ... >>>> <QUOTED_STRING> ... >>>> <UNICODE_STRING_LITERAL> ... >>>> "TRUE" ... >>>> "FALSE" ... >>>> "UNKNOWN" ... >>>> "NULL" ... >>>> <LBRACE_D> ... >>>> <LBRACE_T> ... >>>> <LBRACE_TS> ... >>>> "DATE" ... >>>> "TIME" <QUOTED_STRING> ... >>>> "TIMESTAMP" ... >>>> "INTERVAL" ... >>>> "?" ... >>>> "CAST" ... >>>> "EXTRACT" ... >>>> "POSITION" ... >>>> "CONVERT" ... >>>> "TRANSLATE" ... >>>> "OVERLAY" ... >>>> "FLOOR" ... >>>> "CEIL" ... >>>> "CEILING" ... >>>> "SUBSTRING" ... >>>> "TRIM" ... >>>> "CLASSIFIER" ... >>>> "MATCH_NUMBER" ... >>>> "RUNNING" ... >>>> "PREV" ... >>>> "NEXT" ... >>>> "JSON_EXISTS" ... >>>> "JSON_VALUE" ... >>>> "JSON_QUERY" ... >>>> "JSON_OBJECT" ... >>>> "JSON_OBJECTAGG" ... >>>> "JSON_ARRAY" ... >>>> "JSON_ARRAYAGG" ... >>>> <LBRACE_FN> ... >>>> "MULTISET" ... >>>> "ARRAY" ... >>>> "MAP" ... >>>> "PERIOD" ... >>>> "SPECIFIC" ... >>>> <IDENTIFIER> ... >>>> <QUOTED_IDENTIFIER> ... >>>> <BACK_QUOTED_IDENTIFIER> ... >>>> <BRACKET_QUOTED_IDENTIFIER> ... >>>> <UNICODE_QUOTED_IDENTIFIER> ... >>>> "ABS" ... >>>> "AVG" ... >>>> "CARDINALITY" ... >>>> "CHAR_LENGTH" ... >>>> "CHARACTER_LENGTH" ... >>>> "COALESCE" ... >>>> "COLLECT" ... >>>> "COVAR_POP" ... >>>> "COVAR_SAMP" ... >>>> "CUME_DIST" ... >>>> "COUNT" ... >>>> "CURRENT_DATE" ... >>>> "CURRENT_TIME" ... >>>> "CURRENT_TIMESTAMP" ... >>>> "DENSE_RANK" ... >>>> "ELEMENT" ... >>>> "EXP" ... >>>> "FIRST_VALUE" ... >>>> "FUSION" ... >>>> "GROUPING" ... >>>> "HOUR" ... >>>> "LAG" ... >>>> "LEAD" ... >>>> "LEFT" ... >>>> "LAST_VALUE" ... >>>> "LN" ... >>>> "LOCALTIME" ... >>>> "LOCALTIMESTAMP" ... >>>> "LOWER" ... >>>> "MAX" ... >>>> "MIN" ... >>>> "MINUTE" ... >>>> "MOD" ... >>>> "MONTH" ... >>>> "NTH_VALUE" ... >>>> "NTILE" ... >>>> "NULLIF" ... >>>> "OCTET_LENGTH" ... >>>> "PERCENT_RANK" ... >>>> "POWER" ... >>>> "RANK" ... >>>> "REGR_COUNT" ... >>>> "REGR_SXX" ... >>>> "REGR_SYY" ... >>>> "RIGHT" ... >>>> "ROW_NUMBER" ... >>>> "SECOND" ... >>>> "SQRT" ... >>>> "STDDEV_POP" ... >>>> "STDDEV_SAMP" ... >>>> "SUM" ... >>>> "UPPER" ... >>>> "TRUNCATE" ... >>>> "USER" ... >>>> "VAR_POP" ... >>>> "VAR_SAMP" ... >>>> "YEAR" ... >>>> "CURRENT_CATALOG" ... >>>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ... >>>> "CURRENT_PATH" ... >>>> "CURRENT_ROLE" ... >>>> "CURRENT_SCHEMA" ... >>>> "CURRENT_USER" ... >>>> "SESSION_USER" ... >>>> "SYSTEM_USER" ... >>>> "NEW" ... >>>> "CASE" ... >>>> "CURRENT" ... >>>> >>>> at >>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >>>> at >>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >>>> at >>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>>> at >>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) >>>> at >>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >>>> at >>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) >>>> at >>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>>> >>>> >>>> query: >>>> >>>> >>>> streamTableEnv.sqlUpdate( >>>> """ >>>> | >>>> |CREATE TABLE user_behavior ( >>>> | uid VARCHAR, >>>> | phoneType VARCHAR, >>>> | clickCount INT, >>>> | proctime AS PROCTIME(), >>>> | `time` TIMESTAMP(3) >>>> |) WITH ( >>>> | 'connector.type' = 'kafka', >>>> | 'connector.version' = 'universal', >>>> | 'connector.topic' = 'user_behavior', >>>> | 'connector.startup-mode' = 'earliest-offset', >>>> | 'connector.properties.0.key' = 'zookeeper.connect', >>>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>>> | 'connector.properties.1.key' = 'bootstrap.servers', >>>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>>> | 'update-mode' = 'append', >>>> | 'format.type' = 'json', >>>> | 'format.derive-schema' = 'true' >>>> |) >>>> |""".stripMargin) >>>> streamTableEnv.sqlUpdate( >>>> """ >>>> | >>>> |insert into user_cnt >>>> |SELECT >>>> | cast(b.`time` as string), u.age >>>> |FROM >>>> | user_behavior AS b >>>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u >>>> | ON b.uid = u.uid >>>> | >>>> |""".stripMargin) >>>> >>>> >>>> >>>> >>>> >>>> >>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() >>>> 放在select 后面也不行。 >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> 在 2020-06-12 15:29:49,"Benchao Li" <[hidden email]> 写道: >>>>> 你写反了,是proctime AS PROCTIME()。 >>>>> 计算列跟普通query里面的AS是反着的。 >>>>> >>>>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午2:24写道: >>>>> >>>>>> flink 1.10.0: >>>>>> 在create table中,加PROCTIME() AS proctime字段报错 >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: >>>>>>> Hi, >>>>>>> >>>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >>>>>>> 可以参考下[1] >>>>>>> >>>>>>> [1] >>>>>>> >>>>>> >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >>>>>>> >>>>>>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: >>>>>>> >>>>>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>>>>> >>>>>>>> SLF4J: Found binding in >>>>>>>> >>>>>> >>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>>> >>>>>>>> SLF4J: Found binding in >>>>>>>> >>>>>> >>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>>> >>>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>>>>> explanation. >>>>>>>> >>>>>>>> SLF4J: Actual binding is of type >>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>>>>>>> >>>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default >>>>>>>> configuration: logging only errors to the console. >>>>>>>> >>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>>>> Cannot generate a valid execution plan for the given query: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>>>>>>> fields=[time, sum_age]) >>>>>>>> >>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >>>>>>>> >>>>>>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >>>>>>>> >>>>>>>> :- FlinkLogicalCalc(select=[uid, time]) >>>>>>>> >>>>>>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid, >>>>>> phoneType, >>>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >>>>>>>> >>>>>>>> +- FlinkLogicalSnapshot(period=[$cor0.time]) >>>>>>>> >>>>>>>> +- FlinkLogicalCalc(select=[uid, age]) >>>>>>>> >>>>>>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, >>>> sex, >>>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time]) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' >>>> left >>>>>>>> table's proctime field, doesn't support 'PROCTIME()' >>>>>>>> >>>>>>>> Please check the documentation for the set of currently supported SQL >>>>>>>> features. >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>>> >>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>> >>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>> >>>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>> >>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>>>>>>> >>>>>>>> at >>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>>>>>>> >>>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table >>>>>> join >>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >>>>>>>> field, doesn't support 'PROCTIME()' >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >>>>>>>> >>>>>>>> at >>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >>>>>>>> >>>>>>>> ... 20 more >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> query: >>>>>>>> >>>>>>>> >>>>>>>> val streamExecutionEnv = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> val blinkEnvSettings = >>>>>>>> >>>>>> >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>>>> val streamTableEnv = >>>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >>>>>>>> >>>>>>>> streamTableEnv.sqlUpdate( >>>>>>>> """ >>>>>>>> | >>>>>>>> |CREATE TABLE user_behavior ( >>>>>>>> | uid VARCHAR, >>>>>>>> | phoneType VARCHAR, >>>>>>>> | clickCount INT, >>>>>>>> | `time` TIMESTAMP(3) >>>>>>>> |) WITH ( >>>>>>>> | 'connector.type' = 'kafka', >>>>>>>> | 'connector.version' = 'universal', >>>>>>>> | 'connector.topic' = 'user_behavior', >>>>>>>> | 'connector.startup-mode' = 'earliest-offset', >>>>>>>> | 'connector.properties.0.key' = 'zookeeper.connect', >>>>>>>> | 'connector.properties.0.value' = >>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181', >>>>>>>> | 'connector.properties.1.key' = 'bootstrap.servers', >>>>>>>> | 'connector.properties.1.value' = >>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>>>>> | 'update-mode' = 'append', >>>>>>>> | 'format.type' = 'json', >>>>>>>> | 'format.derive-schema' = 'true' >>>>>>>> |) >>>>>>>> |""".stripMargin) >>>>>>>> streamTableEnv.sqlUpdate( >>>>>>>> """ >>>>>>>> | >>>>>>>> |CREATE TABLE user_cnt ( >>>>>>>> | `time` VARCHAR, >>>>>>>> | sum_age INT >>>>>>>> |) WITH ( >>>>>>>> | 'connector.type' = 'jdbc', >>>>>>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >>>>>>>> | 'connector.table' = 'user_cnt', >>>>>>>> | 'connector.username' = 'root', >>>>>>>> | 'connector.password' = '123456', >>>>>>>> | 'connector.write.flush.max-rows' = '1' >>>>>>>> |) >>>>>>>> |""".stripMargin) >>>>>>>> val userTableSource = new MysqlAsyncLookupTableSource( >>>>>>>> Array("uid", "sex", "age", "created_time"), >>>>>>>> Array(), >>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >>>>>>>> streamTableEnv.registerTableSource("users", userTableSource) >>>>>>>> streamTableEnv.sqlUpdate( >>>>>>>> """ >>>>>>>> | >>>>>>>> |insert into user_cnt >>>>>>>> |SELECT >>>>>>>> | cast(b.`time` as string), u.age >>>>>>>> |FROM >>>>>>>> | user_behavior AS b >>>>>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >>>>>>>> | ON b.uid = u.uid >>>>>>>> | >>>>>>>> |""".stripMargin) >>>>>>>> streamTableEnv.execute("Temporal table join") >>>>>> >>>> |
In reply to this post by MuChen
好的
在 2020-06-12 17:46:22,"咖啡泡油条" <[hidden email]> 写道: >可以参考之前的邮件列表 >https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E > > > > >------------------ 原始邮件 ------------------ >发件人: "Leonard Xu"<[hidden email]>; >发送时间: 2020年6月12日(星期五) 下午5:43 >收件人: "user-zh"<[hidden email]>; > >主题: Re: flink sql Temporal table join failed > > > > >你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。 > >祝好 >Leonard Xu > >> 在 2020年6月12日,17:38,Zhou Zach <[hidden email]> 写道: >> >> >> >> >> 是的,1.10.0版本 >> >> >> >> >> >> >> >> >> 在 2020-06-12 16:28:15,"Benchao Li" <[hidden email]> 写道: >>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 >>> >>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午3:47写道: >>> >>>> 还是不行, >>>> SLF4J: Class path contains multiple SLF4J bindings. >>>> SLF4J: Found binding in >>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>> SLF4J: Found binding in >>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>> explanation. >>>> SLF4J: Actual binding is of type >>>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>>> ERROR StatusLogger No log4j2 configuration file found. Using default >>>> configuration: logging only errors to the console. >>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >>>> SQL parse failed. Encountered "time FROM" at line 1, column 44. >>>> Was expecting one of: >>>> "CURSOR" ... >>>> "EXISTS" ... >>>> "NOT" ... >>>> "ROW" ... >>>> "(" ... >>>> "+" ... >>>> "-" ... >>>> <UNSIGNED_INTEGER_LITERAL> ... >>>> <DECIMAL_NUMERIC_LITERAL> ... >>>> <APPROX_NUMERIC_LITERAL> ... >>>> <BINARY_STRING_LITERAL> ... >>>> <PREFIXED_STRING_LITERAL> ... >>>> <QUOTED_STRING> ... >>>> <UNICODE_STRING_LITERAL> ... >>>> "TRUE" ... >>>> "FALSE" ... >>>> "UNKNOWN" ... >>>> "NULL" ... >>>> <LBRACE_D> ... >>>> <LBRACE_T> ... >>>> <LBRACE_TS> ... >>>> "DATE" ... >>>> "TIME" <QUOTED_STRING> ... >>>> "TIMESTAMP" ... >>>> "INTERVAL" ... >>>> "?" ... >>>> "CAST" ... >>>> "EXTRACT" ... >>>> "POSITION" ... >>>> "CONVERT" ... >>>> "TRANSLATE" ... >>>> "OVERLAY" ... >>>> "FLOOR" ... >>>> "CEIL" ... >>>> "CEILING" ... >>>> "SUBSTRING" ... >>>> "TRIM" ... >>>> "CLASSIFIER" ... >>>> "MATCH_NUMBER" ... >>>> "RUNNING" ... >>>> "PREV" ... >>>> "NEXT" ... >>>> "JSON_EXISTS" ... >>>> "JSON_VALUE" ... >>>> "JSON_QUERY" ... >>>> "JSON_OBJECT" ... >>>> "JSON_OBJECTAGG" ... >>>> "JSON_ARRAY" ... >>>> "JSON_ARRAYAGG" ... >>>> <LBRACE_FN> ... >>>> "MULTISET" ... >>>> "ARRAY" ... >>>> "MAP" ... >>>> "PERIOD" ... >>>> "SPECIFIC" ... >>>> <IDENTIFIER> ... >>>> <QUOTED_IDENTIFIER> ... >>>> <BACK_QUOTED_IDENTIFIER> ... >>>> <BRACKET_QUOTED_IDENTIFIER> ... >>>> <UNICODE_QUOTED_IDENTIFIER> ... >>>> "ABS" ... >>>> "AVG" ... >>>> "CARDINALITY" ... >>>> "CHAR_LENGTH" ... >>>> "CHARACTER_LENGTH" ... >>>> "COALESCE" ... >>>> "COLLECT" ... >>>> "COVAR_POP" ... >>>> "COVAR_SAMP" ... >>>> "CUME_DIST" ... >>>> "COUNT" ... >>>> "CURRENT_DATE" ... >>>> "CURRENT_TIME" ... >>>> "CURRENT_TIMESTAMP" ... >>>> "DENSE_RANK" ... >>>> "ELEMENT" ... >>>> "EXP" ... >>>> "FIRST_VALUE" ... >>>> "FUSION" ... >>>> "GROUPING" ... >>>> "HOUR" ... >>>> "LAG" ... >>>> "LEAD" ... >>>> "LEFT" ... >>>> "LAST_VALUE" ... >>>> "LN" ... >>>> "LOCALTIME" ... >>>> "LOCALTIMESTAMP" ... >>>> "LOWER" ... >>>> "MAX" ... >>>> "MIN" ... >>>> "MINUTE" ... >>>> "MOD" ... >>>> "MONTH" ... >>>> "NTH_VALUE" ... >>>> "NTILE" ... >>>> "NULLIF" ... >>>> "OCTET_LENGTH" ... >>>> "PERCENT_RANK" ... >>>> "POWER" ... >>>> "RANK" ... >>>> "REGR_COUNT" ... >>>> "REGR_SXX" ... >>>> "REGR_SYY" ... >>>> "RIGHT" ... >>>> "ROW_NUMBER" ... >>>> "SECOND" ... >>>> "SQRT" ... >>>> "STDDEV_POP" ... >>>> "STDDEV_SAMP" ... >>>> "SUM" ... >>>> "UPPER" ... >>>> "TRUNCATE" ... >>>> "USER" ... >>>> "VAR_POP" ... >>>> "VAR_SAMP" ... >>>> "YEAR" ... >>>> "CURRENT_CATALOG" ... >>>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ... >>>> "CURRENT_PATH" ... >>>> "CURRENT_ROLE" ... >>>> "CURRENT_SCHEMA" ... >>>> "CURRENT_USER" ... >>>> "SESSION_USER" ... >>>> "SYSTEM_USER" ... >>>> "NEW" ... >>>> "CASE" ... >>>> "CURRENT" ... >>>> >>>> at >>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >>>> at >>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >>>> at >>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>>> at >>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) >>>> at >>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >>>> at >>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) >>>> at >>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>>> >>>> >>>> query: >>>> >>>> >>>> streamTableEnv.sqlUpdate( >>>> """ >>>> | >>>> |CREATE TABLE user_behavior ( >>>> | uid VARCHAR, >>>> | phoneType VARCHAR, >>>> | clickCount INT, >>>> | proctime AS PROCTIME(), >>>> | `time` TIMESTAMP(3) >>>> |) WITH ( >>>> | 'connector.type' = 'kafka', >>>> | 'connector.version' = 'universal', >>>> | 'connector.topic' = 'user_behavior', >>>> | 'connector.startup-mode' = 'earliest-offset', >>>> | 'connector.properties.0.key' = 'zookeeper.connect', >>>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>>> | 'connector.properties.1.key' = 'bootstrap.servers', >>>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>>> | 'update-mode' = 'append', >>>> | 'format.type' = 'json', >>>> | 'format.derive-schema' = 'true' >>>> |) >>>> |""".stripMargin) >>>> streamTableEnv.sqlUpdate( >>>> """ >>>> | >>>> |insert into user_cnt >>>> |SELECT >>>> | cast(b.`time` as string), u.age >>>> |FROM >>>> | user_behavior AS b >>>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u >>>> | ON b.uid = u.uid >>>> | >>>> |""".stripMargin) >>>> >>>> >>>> >>>> >>>> >>>> >>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() >>>> 放在select 后面也不行。 >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> 在 2020-06-12 15:29:49,"Benchao Li" <[hidden email]> 写道: >>>>> 你写反了,是proctime AS PROCTIME()。 >>>>> 计算列跟普通query里面的AS是反着的。 >>>>> >>>>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午2:24写道: >>>>> >>>>>> flink 1.10.0: >>>>>> 在create table中,加PROCTIME() AS proctime字段报错 >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 在 2020-06-12 14:08:11,"Benchao Li" <[hidden email]> 写道: >>>>>>> Hi, >>>>>>> >>>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >>>>>>> 可以参考下[1] >>>>>>> >>>>>>> [1] >>>>>>> >>>>>> >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >>>>>>> >>>>>>> Zhou Zach <[hidden email]> 于2020年6月12日周五 下午1:33写道: >>>>>>> >>>>>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>>>>> >>>>>>>> SLF4J: Found binding in >>>>>>>> >>>>>> >>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>>> >>>>>>>> SLF4J: Found binding in >>>>>>>> >>>>>> >>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>>> >>>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>>>>> explanation. >>>>>>>> >>>>>>>> SLF4J: Actual binding is of type >>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>>>>>>> >>>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default >>>>>>>> configuration: logging only errors to the console. >>>>>>>> >>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>>>> Cannot generate a valid execution plan for the given query: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>>>>>>> fields=[time, sum_age]) >>>>>>>> >>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >>>>>>>> >>>>>>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >>>>>>>> >>>>>>>> :- FlinkLogicalCalc(select=[uid, time]) >>>>>>>> >>>>>>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid, >>>>>> phoneType, >>>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >>>>>>>> >>>>>>>> +- FlinkLogicalSnapshot(period=[$cor0.time]) >>>>>>>> >>>>>>>> +- FlinkLogicalCalc(select=[uid, age]) >>>>>>>> >>>>>>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, >>>> sex, >>>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time]) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' >>>> left >>>>>>>> table's proctime field, doesn't support 'PROCTIME()' >>>>>>>> >>>>>>>> Please check the documentation for the set of currently supported SQL >>>>>>>> features. >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>>> >>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>> >>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>> >>>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>> >>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>>>>>>> >>>>>>>> at >>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>>>>>>> >>>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table >>>>>> join >>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >>>>>>>> field, doesn't support 'PROCTIME()' >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >>>>>>>> >>>>>>>> at >>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >>>>>>>> >>>>>>>> ... 20 more >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> query: >>>>>>>> >>>>>>>> >>>>>>>> val streamExecutionEnv = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> val blinkEnvSettings = >>>>>>>> >>>>>> >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>>>> val streamTableEnv = >>>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >>>>>>>> >>>>>>>> streamTableEnv.sqlUpdate( >>>>>>>> """ >>>>>>>> | >>>>>>>> |CREATE TABLE user_behavior ( >>>>>>>> | uid VARCHAR, >>>>>>>> | phoneType VARCHAR, >>>>>>>> | clickCount INT, >>>>>>>> | `time` TIMESTAMP(3) >>>>>>>> |) WITH ( >>>>>>>> | 'connector.type' = 'kafka', >>>>>>>> | 'connector.version' = 'universal', >>>>>>>> | 'connector.topic' = 'user_behavior', >>>>>>>> | 'connector.startup-mode' = 'earliest-offset', >>>>>>>> | 'connector.properties.0.key' = 'zookeeper.connect', >>>>>>>> | 'connector.properties.0.value' = >>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181', >>>>>>>> | 'connector.properties.1.key' = 'bootstrap.servers', >>>>>>>> | 'connector.properties.1.value' = >>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>>>>> | 'update-mode' = 'append', >>>>>>>> | 'format.type' = 'json', >>>>>>>> | 'format.derive-schema' = 'true' >>>>>>>> |) >>>>>>>> |""".stripMargin) >>>>>>>> streamTableEnv.sqlUpdate( >>>>>>>> """ >>>>>>>> | >>>>>>>> |CREATE TABLE user_cnt ( >>>>>>>> | `time` VARCHAR, >>>>>>>> | sum_age INT >>>>>>>> |) WITH ( >>>>>>>> | 'connector.type' = 'jdbc', >>>>>>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >>>>>>>> | 'connector.table' = 'user_cnt', >>>>>>>> | 'connector.username' = 'root', >>>>>>>> | 'connector.password' = '123456', >>>>>>>> | 'connector.write.flush.max-rows' = '1' >>>>>>>> |) >>>>>>>> |""".stripMargin) >>>>>>>> val userTableSource = new MysqlAsyncLookupTableSource( >>>>>>>> Array("uid", "sex", "age", "created_time"), >>>>>>>> Array(), >>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >>>>>>>> streamTableEnv.registerTableSource("users", userTableSource) >>>>>>>> streamTableEnv.sqlUpdate( >>>>>>>> """ >>>>>>>> | >>>>>>>> |insert into user_cnt >>>>>>>> |SELECT >>>>>>>> | cast(b.`time` as string), u.age >>>>>>>> |FROM >>>>>>>> | user_behavior AS b >>>>>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >>>>>>>> | ON b.uid = u.uid >>>>>>>> | >>>>>>>> |""".stripMargin) >>>>>>>> streamTableEnv.execute("Temporal table join") >>>>>> >>>> |
Free forum by Nabble | Edit this page |