flink sql sink mysql requires primary keys

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql sink mysql requires primary keys

Zhou Zach-2
Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
        at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68)
        at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)





Query:
Flink :1.10.0
CREATE TABLE user_uv(
|    `time` VARCHAR,
|    cnt bigint
|) WITH (
|    'connector.type' = 'jdbc')
|insert into user_uv
|select  MAX(DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')) as `time`, COUNT(DISTINCT  uid) as cnt
|from `user`
|group by DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')
Reply | Threaded
Open this post in threaded view
|

Re:flink sql sink mysql requires primary keys

Zhou Zach
加了primary key报错,
Exception in thread "main" org.apache.flink.table.planner.operations.SqlConversionException: Primary key and unique key are not supported yet.
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52)
at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)


Query:


streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_uv(
    |    `time` VARCHAR,
    |    cnt bigint,
    |    PRIMARY KEY (`time`)
    |) WITH (
    |    'connector.type' = 'jdbc',
    |    'connector.write.flush.max-rows' = '1'
    |)
    |""".stripMargin)

















At 2020-06-17 20:59:35, "Zhou Zach" <[hidden email]> wrote:

>Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68)
> at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
>
>
>
>
>
>Query:
>Flink :1.10.0
>CREATE TABLE user_uv(
>|    `time` VARCHAR,
>|    cnt bigint
>|) WITH (
>|    'connector.type' = 'jdbc')
>|insert into user_uv
>|select  MAX(DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')) as `time`, COUNT(DISTINCT  uid) as cnt
>|from `user`
>|group by DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')
Reply | Threaded
Open this post in threaded view
|

Re: flink sql sink mysql requires primary keys

Jark
Administrator
Hi,

在 Flink 1.10 中,sink 的 primary key 是从 query 推导的,如果 query 推导不出 pk 就会报你看到的错误
“UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.”
你的这个作业就是 query pk 推导不出来的 case。

此外 DDL 上声明 PK 在1.10也是不支持的。

这些问题,在 1.11 都解决了,可以尝试自己拿 release-1.11 分支编译下尝试下。
Flink 1.11 中,sink的 primary key 都是从 DDL 上用户显式声明出来的,不会去推导 query pk。

Best,
Jark


On Thu, 18 Jun 2020 at 09:39, Zhou Zach <[hidden email]> wrote:

> 加了primary key报错,
> Exception in thread "main"
> org.apache.flink.table.planner.operations.SqlConversionException: Primary
> key and unique key are not supported yet.
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52)
> at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
>
>
> Query:
>
>
> streamTableEnv.sqlUpdate(
> """
>     |
>     |CREATE TABLE user_uv(
>     |    `time` VARCHAR,
>     |    cnt bigint,
>     |    PRIMARY KEY (`time`)
>     |) WITH (
>     |    'connector.type' = 'jdbc',
>     |    'connector.write.flush.max-rows' = '1'
>     |)
>     |""".stripMargin)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2020-06-17 20:59:35, "Zhou Zach" <[hidden email]> wrote:
> >Exception in thread "main" org.apache.flink.table.api.TableException:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> >       at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> >       at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> >       at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> >       at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >       at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >       at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >       at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >       at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> >       at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> >       at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >       at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >       at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68)
> >       at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
> >
> >
> >
> >
> >
> >Query:
> >Flink :1.10.0
> >CREATE TABLE user_uv(
> >|    `time` VARCHAR,
> >|    cnt bigint
> >|) WITH (
> >|    'connector.type' = 'jdbc')
> >|insert into user_uv
> >|select  MAX(DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')) as `time`,
> COUNT(DISTINCT  uid) as cnt
> >|from `user`
> >|group by DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')
>