Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()

史 正超
Source表定义的proctime,在view中select后,不能用于Temporal table join.
sql如下:
```sql

CREATE TABLE SourceA (
    id              STRING,
    procTime AS PROCTIME()
) WITH (
    'connector' = 'kafka-0.11'
);


CREATE TABLE DimTable (
  id             STRING,
  dim1           STRING,
  primary key(id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc'
);


CREATE VIEW TestView AS
SELECT
    id,
    procTime
FROM RouteStaffPostQueue;


INSERT INTO Sink
SELECT
  id,
  dim1,
  sum(post_count)
FROM TestView t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id
GROUP BY id, dim1;

// 换成如下的sql 没有问题。

INSERT INTO Sink
SELECT
  id,
  dim1,
  sum(post_count)
FROM (SELECT * FROM SourceA) t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id
GROUP BY id, dim1;

```
如果把view的语句替换成TestView则没有问题,报错信息如下:

```
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2020-11-02 19:21:17.363 [main] INFO  org.apache.flink.table.catalog.CatalogManager  - Set the current default catalog as [mysql] and the current default database as [default].
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

FlinkLogicalSink(table=[mysql.default.dis_pair_day], fields=[sta_date, dis_code, post_count])
+- FlinkLogicalCalc(select=[CAST(sta_date) AS sta_date, dis_code, CAST(EXPR$2) AS post_count])
   +- FlinkLogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
      +- FlinkLogicalCalc(select=[sta_date, dis_code, 1 AS post_count])
         +- FlinkLogicalJoin(condition=[=($2, $1)], joinType=[inner])
            :- FlinkLogicalCalc(select=[REPLACE(SUBSTR(postTime, 1, 10), _UTF-16LE'-', _UTF-16LE'') AS sta_date, assetCode AS edCode])
            :  +- FlinkLogicalTableSourceScan(table=[[mysql, default, RouteStaffPostQueue]], fields=[postTime, serviceChargeType, assetCode, couponId, realCouponPay, serviceChargeAmount, busyStatus])
            +- FlinkLogicalSnapshot(period=[$cor0.procTime])
               +- FlinkLogicalCalc(select=[ed_code, dis_code])
                  +- FlinkLogicalTableSourceScan(table=[[mysql, default, BasicEdInfo]], fields=[ed_code, dis_code, throw_area_type, is_free])

Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:145)
at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77)
Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:68)
at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:150)
at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:164)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256)
at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
... 21 more



```
Reply | Threaded
Open this post in threaded view
|

Re: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()

Leonard Xu
Hi,
你好,你用的版本是什么?
在1.12前, temporal join 一个view 是不支持的, 只能直接关联一个可以lookup的table

祝好
Leonard

> 在 2020年11月2日,19:40,史 正超 <[hidden email]> 写道:
>
> Source表定义的proctime,在view中select后,不能用于Temporal table join.
> sql如下:
> ```sql
>
> CREATE TABLE SourceA (
>    id              STRING,
>    procTime AS PROCTIME()
> ) WITH (
>    'connector' = 'kafka-0.11'
> );
>
>
> CREATE TABLE DimTable (
>  id             STRING,
>  dim1           STRING,
>  primary key(id) NOT ENFORCED
> ) WITH (
>  'connector' = 'jdbc'
> );
>
>
> CREATE VIEW TestView AS
> SELECT
>    id,
>    procTime
> FROM RouteStaffPostQueue;
>
>
> INSERT INTO Sink
> SELECT
>  id,
>  dim1,
>  sum(post_count)
> FROM TestView t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id
> GROUP BY id, dim1;
>
> // 换成如下的sql 没有问题。
>
> INSERT INTO Sink
> SELECT
>  id,
>  dim1,
>  sum(post_count)
> FROM (SELECT * FROM SourceA) t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id
> GROUP BY id, dim1;
>
> ```
> 如果把view的语句替换成TestView则没有问题,报错信息如下:
>
> ```
> Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
> 2020-11-02 19:21:17.363 [main] INFO  org.apache.flink.table.catalog.CatalogManager  - Set the current default catalog as [mysql] and the current default database as [default].
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
>
> FlinkLogicalSink(table=[mysql.default.dis_pair_day], fields=[sta_date, dis_code, post_count])
> +- FlinkLogicalCalc(select=[CAST(sta_date) AS sta_date, dis_code, CAST(EXPR$2) AS post_count])
>   +- FlinkLogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
>      +- FlinkLogicalCalc(select=[sta_date, dis_code, 1 AS post_count])
>         +- FlinkLogicalJoin(condition=[=($2, $1)], joinType=[inner])
>            :- FlinkLogicalCalc(select=[REPLACE(SUBSTR(postTime, 1, 10), _UTF-16LE'-', _UTF-16LE'') AS sta_date, assetCode AS edCode])
>            :  +- FlinkLogicalTableSourceScan(table=[[mysql, default, RouteStaffPostQueue]], fields=[postTime, serviceChargeType, assetCode, couponId, realCouponPay, serviceChargeAmount, busyStatus])
>            +- FlinkLogicalSnapshot(period=[$cor0.procTime])
>               +- FlinkLogicalCalc(select=[ed_code, dis_code])
>                  +- FlinkLogicalTableSourceScan(table=[[mysql, default, BasicEdInfo]], fields=[ed_code, dis_code, throw_area_type, is_free])
>
> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
> Please check the documentation for the set of currently supported SQL features.
> at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
> at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
> at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
> at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:145)
> at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77)
> Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
> at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:68)
> at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:150)
> at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:164)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
> at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
> at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
> at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
> at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> ... 21 more
>
>
>
> ```

Reply | Threaded
Open this post in threaded view
|

回复: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()

史 正超
谢谢回复,差不多是这样的, 根本原因是,source的procTime转到view后失去了时间属性变成了TIMESTAMP类型。
________________________________
发件人: Leonard Xu <[hidden email]>
发送时间: 2020年11月3日 1:48
收件人: user-zh <[hidden email]>
主题: Re: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()

Hi,
你好,你用的版本是什么?
在1.12前, temporal join 一个view 是不支持的, 只能直接关联一个可以lookup的table

祝好
Leonard

> 在 2020年11月2日,19:40,史 正超 <[hidden email]> 写道:
>
> Source表定义的proctime,在view中select后,不能用于Temporal table join.
> sql如下:
> ```sql
>
> CREATE TABLE SourceA (
>    id              STRING,
>    procTime AS PROCTIME()
> ) WITH (
>    'connector' = 'kafka-0.11'
> );
>
>
> CREATE TABLE DimTable (
>  id             STRING,
>  dim1           STRING,
>  primary key(id) NOT ENFORCED
> ) WITH (
>  'connector' = 'jdbc'
> );
>
>
> CREATE VIEW TestView AS
> SELECT
>    id,
>    procTime
> FROM RouteStaffPostQueue;
>
>
> INSERT INTO Sink
> SELECT
>  id,
>  dim1,
>  sum(post_count)
> FROM TestView t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id
> GROUP BY id, dim1;
>
> // 换成如下的sql 没有问题。
>
> INSERT INTO Sink
> SELECT
>  id,
>  dim1,
>  sum(post_count)
> FROM (SELECT * FROM SourceA) t1 JOIN DimTable FOR SYSTEM_TIME AS OF t1.procTime d1 ON d1.id = t1.id
> GROUP BY id, dim1;
>
> ```
> 如果把view的语句替换成TestView则没有问题,报错信息如下:
>
> ```
> Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
> 2020-11-02 19:21:17.363 [main] INFO  org.apache.flink.table.catalog.CatalogManager  - Set the current default catalog as [mysql] and the current default database as [default].
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
>
> FlinkLogicalSink(table=[mysql.default.dis_pair_day], fields=[sta_date, dis_code, post_count])
> +- FlinkLogicalCalc(select=[CAST(sta_date) AS sta_date, dis_code, CAST(EXPR$2) AS post_count])
>   +- FlinkLogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
>      +- FlinkLogicalCalc(select=[sta_date, dis_code, 1 AS post_count])
>         +- FlinkLogicalJoin(condition=[=($2, $1)], joinType=[inner])
>            :- FlinkLogicalCalc(select=[REPLACE(SUBSTR(postTime, 1, 10), _UTF-16LE'-', _UTF-16LE'') AS sta_date, assetCode AS edCode])
>            :  +- FlinkLogicalTableSourceScan(table=[[mysql, default, RouteStaffPostQueue]], fields=[postTime, serviceChargeType, assetCode, couponId, realCouponPay, serviceChargeAmount, busyStatus])
>            +- FlinkLogicalSnapshot(period=[$cor0.procTime])
>               +- FlinkLogicalCalc(select=[ed_code, dis_code])
>                  +- FlinkLogicalTableSourceScan(table=[[mysql, default, BasicEdInfo]], fields=[ed_code, dis_code, throw_area_type, is_free])
>
> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
> Please check the documentation for the set of currently supported SQL features.
> at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
> at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
> at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
> at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:145)
> at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77)
> Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
> at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:68)
> at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:150)
> at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:164)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
> at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
> at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
> at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
> at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> ... 21 more
>
>
>
> ```