Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) Query: Flink :1.10.0 CREATE TABLE user_uv( | `time` VARCHAR, | cnt bigint |) WITH ( | 'connector.type' = 'jdbc') |insert into user_uv |select MAX(DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')) as `time`, COUNT(DISTINCT uid) as cnt |from `user` |group by DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00') |
加了primary key报错,
Exception in thread "main" org.apache.flink.table.planner.operations.SqlConversionException: Primary key and unique key are not supported yet. at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) Query: streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_uv( | `time` VARCHAR, | cnt bigint, | PRIMARY KEY (`time`) |) WITH ( | 'connector.type' = 'jdbc', | 'connector.write.flush.max-rows' = '1' |) |""".stripMargin) At 2020-06-17 20:59:35, "Zhou Zach" <[hidden email]> wrote: >Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68) > at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) > > > > > >Query: >Flink :1.10.0 >CREATE TABLE user_uv( >| `time` VARCHAR, >| cnt bigint >|) WITH ( >| 'connector.type' = 'jdbc') >|insert into user_uv >|select MAX(DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')) as `time`, COUNT(DISTINCT uid) as cnt >|from `user` >|group by DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00') |
Administrator
|
Hi,
在 Flink 1.10 中,sink 的 primary key 是从 query 推导的,如果 query 推导不出 pk 就会报你看到的错误 “UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” 你的这个作业就是 query pk 推导不出来的 case。 此外 DDL 上声明 PK 在1.10也是不支持的。 这些问题,在 1.11 都解决了,可以尝试自己拿 release-1.11 分支编译下尝试下。 Flink 1.11 中,sink的 primary key 都是从 DDL 上用户显式声明出来的,不会去推导 query pk。 Best, Jark On Thu, 18 Jun 2020 at 09:39, Zhou Zach <[hidden email]> wrote: > 加了primary key报错, > Exception in thread "main" > org.apache.flink.table.planner.operations.SqlConversionException: Primary > key and unique key are not supported yet. > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52) > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) > > > Query: > > > streamTableEnv.sqlUpdate( > """ > | > |CREATE TABLE user_uv( > | `time` VARCHAR, > | cnt bigint, > | PRIMARY KEY (`time`) > |) WITH ( > | 'connector.type' = 'jdbc', > | 'connector.write.flush.max-rows' = '1' > |) > |""".stripMargin) > > > > > > > > > > > > > > > > > > At 2020-06-17 20:59:35, "Zhou Zach" <[hidden email]> wrote: > >Exception in thread "main" org.apache.flink.table.api.TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68) > > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) > > > > > > > > > > > >Query: > >Flink :1.10.0 > >CREATE TABLE user_uv( > >| `time` VARCHAR, > >| cnt bigint > >|) WITH ( > >| 'connector.type' = 'jdbc') > >|insert into user_uv > >|select MAX(DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')) as `time`, > COUNT(DISTINCT uid) as cnt > >|from `user` > >|group by DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00') > |
Free forum by Nabble | Edit this page |