Hi,
我使用Flink 1.10,开启了Blink Planner,在尝试写入ES且使用UpsertMode时(sql就是insert into table select xxx group by xxxxx),抛出了如下异常: 我通过DDL尝试定义一个ESTableSink,且声名primary key时,运行时又说Primary key和unique key目前不支持。。那这就是个悖论啊。。真的不科学。 关键问题:我切换回使用老的planner时,是没问题的。。这可能是Blink Planner的bug么?真心请教。 org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:82) at com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.doJob(ZhangleClientComputeTask.java:80) at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50) at com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.main(ZhangleClientComputeTask.java:27) |
Administrator
|
Hi sunfulin,
这个异常是说通过 query 推断不出 query 的 primary key,不是说 sink 没有 primary key。至于为什么 query 推断不出 pk,可能要结合 query 看一下。 我看到你在 user@ 里面也发邮件了,我已经在那下面回复了,我们要不在 user@ 邮件下面继续讨论吧。可以将你们的 SQL 补充一下,包括 DDL。 Best, Jark On Fri, 14 Feb 2020 at 23:03, sunfulin <[hidden email]> wrote: > Hi, > 我使用Flink 1.10,开启了Blink Planner,在尝试写入ES且使用UpsertMode时(sql就是insert into > table select xxx group by xxxxx),抛出了如下异常: > 我通过DDL尝试定义一个ESTableSink,且声名primary key时,运行时又说Primary key和unique > key目前不支持。。那这就是个悖论啊。。真的不科学。 > > > 关键问题:我切换回使用老的planner时,是没问题的。。这可能是Blink Planner的bug么?真心请教。 > > > org.apache.flink.table.api.TableException: UpsertStreamTableSink requires > that Table has a full primary keys if it is updated. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at > com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:82) > at > com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.doJob(ZhangleClientComputeTask.java:80) > at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50) > at > com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.main(ZhangleClientComputeTask.java:27) > > |
好的,感谢。我在user里附加了query SQL。
在 2020-02-15 16:14:56,"Jark Wu" <[hidden email]> 写道: >Hi sunfulin, > >这个异常是说通过 query 推断不出 query 的 primary key,不是说 sink 没有 primary key。至于为什么 query >推断不出 pk,可能要结合 query 看一下。 >我看到你在 user@ 里面也发邮件了,我已经在那下面回复了,我们要不在 user@ 邮件下面继续讨论吧。可以将你们的 SQL 补充一下,包括 >DDL。 > >Best, >Jark > >On Fri, 14 Feb 2020 at 23:03, sunfulin <[hidden email]> wrote: > >> Hi, >> 我使用Flink 1.10,开启了Blink Planner,在尝试写入ES且使用UpsertMode时(sql就是insert into >> table select xxx group by xxxxx),抛出了如下异常: >> 我通过DDL尝试定义一个ESTableSink,且声名primary key时,运行时又说Primary key和unique >> key目前不支持。。那这就是个悖论啊。。真的不科学。 >> >> >> 关键问题:我切换回使用老的planner时,是没问题的。。这可能是Blink Planner的bug么?真心请教。 >> >> >> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires >> that Table has a full primary keys if it is updated. >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> at scala.collection.Iterator.foreach(Iterator.scala:937) >> at scala.collection.Iterator.foreach$(Iterator.scala:937) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> at >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:82) >> at >> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.doJob(ZhangleClientComputeTask.java:80) >> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50) >> at >> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.main(ZhangleClientComputeTask.java:27) >> >> |
Free forum by Nabble | Edit this page |