Source表定义的proctime,在view中select后,不能用于Temporal table join.
sql如下: ```sql CREATE TABLE SourceA ( id STRING, procTime AS PROCTIME() ) WITH ( 'connector' = 'kafka-0.11' ); CREATE TABLE DimTable ( id STRING, dim1 STRING, primary key(id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc' ); CREATE VIEW TestView AS SELECT id, procTime FROM RouteStaffPostQueue; INSERT INTO Sink SELECT id, dim1, sum(post_count) FROM TestView t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id GROUP BY id, dim1; // 换成如下的sql 没有问题。 INSERT INTO Sink SELECT id, dim1, sum(post_count) FROM (SELECT * FROM SourceA) t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id GROUP BY id, dim1; ``` 如果把view的语句替换成TestView则没有问题,报错信息如下: ``` Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. 2020-11-02 19:21:17.363 [main] INFO org.apache.flink.table.catalog.CatalogManager - Set the current default catalog as [mysql] and the current default database as [default]. Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(table=[mysql.default.dis_pair_day], fields=[sta_date, dis_code, post_count]) +- FlinkLogicalCalc(select=[CAST(sta_date) AS sta_date, dis_code, CAST(EXPR$2) AS post_count]) +- FlinkLogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)]) +- FlinkLogicalCalc(select=[sta_date, dis_code, 1 AS post_count]) +- FlinkLogicalJoin(condition=[=($2, $1)], joinType=[inner]) :- FlinkLogicalCalc(select=[REPLACE(SUBSTR(postTime, 1, 10), _UTF-16LE'-', _UTF-16LE'') AS sta_date, assetCode AS edCode]) : +- FlinkLogicalTableSourceScan(table=[[mysql, default, RouteStaffPostQueue]], fields=[postTime, serviceChargeType, assetCode, couponId, realCouponPay, serviceChargeAmount, busyStatus]) +- FlinkLogicalSnapshot(period=[$cor0.procTime]) +- FlinkLogicalCalc(select=[ed_code, dis_code]) +- FlinkLogicalTableSourceScan(table=[[mysql, default, BasicEdInfo]], fields=[ed_code, dis_code, throw_area_type, is_free]) Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:145) at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77) Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:68) at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:150) at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:164) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) ... 21 more ``` |
Hi,
你好,你用的版本是什么? 在1.12前, temporal join 一个view 是不支持的, 只能直接关联一个可以lookup的table 祝好 Leonard > 在 2020年11月2日,19:40,史 正超 <[hidden email]> 写道: > > Source表定义的proctime,在view中select后,不能用于Temporal table join. > sql如下: > ```sql > > CREATE TABLE SourceA ( > id STRING, > procTime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka-0.11' > ); > > > CREATE TABLE DimTable ( > id STRING, > dim1 STRING, > primary key(id) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc' > ); > > > CREATE VIEW TestView AS > SELECT > id, > procTime > FROM RouteStaffPostQueue; > > > INSERT INTO Sink > SELECT > id, > dim1, > sum(post_count) > FROM TestView t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id > GROUP BY id, dim1; > > // 换成如下的sql 没有问题。 > > INSERT INTO Sink > SELECT > id, > dim1, > sum(post_count) > FROM (SELECT * FROM SourceA) t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id > GROUP BY id, dim1; > > ``` > 如果把view的语句替换成TestView则没有问题,报错信息如下: > > ``` > Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. > 2020-11-02 19:21:17.363 [main] INFO org.apache.flink.table.catalog.CatalogManager - Set the current default catalog as [mysql] and the current default database as [default]. > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: > > FlinkLogicalSink(table=[mysql.default.dis_pair_day], fields=[sta_date, dis_code, post_count]) > +- FlinkLogicalCalc(select=[CAST(sta_date) AS sta_date, dis_code, CAST(EXPR$2) AS post_count]) > +- FlinkLogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)]) > +- FlinkLogicalCalc(select=[sta_date, dis_code, 1 AS post_count]) > +- FlinkLogicalJoin(condition=[=($2, $1)], joinType=[inner]) > :- FlinkLogicalCalc(select=[REPLACE(SUBSTR(postTime, 1, 10), _UTF-16LE'-', _UTF-16LE'') AS sta_date, assetCode AS edCode]) > : +- FlinkLogicalTableSourceScan(table=[[mysql, default, RouteStaffPostQueue]], fields=[postTime, serviceChargeType, assetCode, couponId, realCouponPay, serviceChargeAmount, busyStatus]) > +- FlinkLogicalSnapshot(period=[$cor0.procTime]) > +- FlinkLogicalCalc(select=[ed_code, dis_code]) > +- FlinkLogicalTableSourceScan(table=[[mysql, default, BasicEdInfo]], fields=[ed_code, dis_code, throw_area_type, is_free]) > > Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > Please check the documentation for the set of currently supported SQL features. > at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) > at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) > at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) > at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:145) > at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77) > Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:68) > at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:150) > at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:164) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256) > at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) > at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > ... 21 more > > > > ``` |
谢谢回复,差不多是这样的, 根本原因是,source的procTime转到view后失去了时间属性变成了TIMESTAMP类型。
________________________________ 发件人: Leonard Xu <[hidden email]> 发送时间: 2020年11月3日 1:48 收件人: user-zh <[hidden email]> 主题: Re: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME() Hi, 你好,你用的版本是什么? 在1.12前, temporal join 一个view 是不支持的, 只能直接关联一个可以lookup的table 祝好 Leonard > 在 2020年11月2日,19:40,史 正超 <[hidden email]> 写道: > > Source表定义的proctime,在view中select后,不能用于Temporal table join. > sql如下: > ```sql > > CREATE TABLE SourceA ( > id STRING, > procTime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka-0.11' > ); > > > CREATE TABLE DimTable ( > id STRING, > dim1 STRING, > primary key(id) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc' > ); > > > CREATE VIEW TestView AS > SELECT > id, > procTime > FROM RouteStaffPostQueue; > > > INSERT INTO Sink > SELECT > id, > dim1, > sum(post_count) > FROM TestView t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id > GROUP BY id, dim1; > > // 换成如下的sql 没有问题。 > > INSERT INTO Sink > SELECT > id, > dim1, > sum(post_count) > FROM (SELECT * FROM SourceA) t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id > GROUP BY id, dim1; > > ``` > 如果把view的语句替换成TestView则没有问题,报错信息如下: > > ``` > Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. > 2020-11-02 19:21:17.363 [main] INFO org.apache.flink.table.catalog.CatalogManager - Set the current default catalog as [mysql] and the current default database as [default]. > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: > > FlinkLogicalSink(table=[mysql.default.dis_pair_day], fields=[sta_date, dis_code, post_count]) > +- FlinkLogicalCalc(select=[CAST(sta_date) AS sta_date, dis_code, CAST(EXPR$2) AS post_count]) > +- FlinkLogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)]) > +- FlinkLogicalCalc(select=[sta_date, dis_code, 1 AS post_count]) > +- FlinkLogicalJoin(condition=[=($2, $1)], joinType=[inner]) > :- FlinkLogicalCalc(select=[REPLACE(SUBSTR(postTime, 1, 10), _UTF-16LE'-', _UTF-16LE'') AS sta_date, assetCode AS edCode]) > : +- FlinkLogicalTableSourceScan(table=[[mysql, default, RouteStaffPostQueue]], fields=[postTime, serviceChargeType, assetCode, couponId, realCouponPay, serviceChargeAmount, busyStatus]) > +- FlinkLogicalSnapshot(period=[$cor0.procTime]) > +- FlinkLogicalCalc(select=[ed_code, dis_code]) > +- FlinkLogicalTableSourceScan(table=[[mysql, default, BasicEdInfo]], fields=[ed_code, dis_code, throw_area_type, is_free]) > > Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > Please check the documentation for the set of currently supported SQL features. > at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) > at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) > at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) > at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:145) > at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77) > Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:68) > at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:150) > at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:164) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256) > at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) > at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > ... 21 more > > > > ``` |
Free forum by Nabble | Edit this page |