使用Flink 1.10 blink planner写ES的异常问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

使用Flink 1.10 blink planner写ES的异常问题

sunfulin
Hi,
我使用Flink 1.10,开启了Blink Planner,在尝试写入ES且使用UpsertMode时(sql就是insert into table select xxx group by xxxxx),抛出了如下异常:
我通过DDL尝试定义一个ESTableSink,且声名primary key时,运行时又说Primary key和unique key目前不支持。。那这就是个悖论啊。。真的不科学。


关键问题:我切换回使用老的planner时,是没问题的。。这可能是Blink Planner的bug么?真心请教。


org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:82)
at com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.doJob(ZhangleClientComputeTask.java:80)
at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50)
at com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.main(ZhangleClientComputeTask.java:27)

Reply | Threaded
Open this post in threaded view
|

Re: 使用Flink 1.10 blink planner写ES的异常问题

Jark
Administrator
Hi sunfulin,

这个异常是说通过 query 推断不出 query 的 primary key,不是说 sink 没有 primary key。至于为什么 query
推断不出 pk,可能要结合 query 看一下。
我看到你在 user@ 里面也发邮件了,我已经在那下面回复了,我们要不在 user@ 邮件下面继续讨论吧。可以将你们的 SQL 补充一下,包括
DDL。

Best,
Jark

On Fri, 14 Feb 2020 at 23:03, sunfulin <[hidden email]> wrote:

> Hi,
> 我使用Flink 1.10,开启了Blink Planner,在尝试写入ES且使用UpsertMode时(sql就是insert into
> table select xxx group by xxxxx),抛出了如下异常:
> 我通过DDL尝试定义一个ESTableSink,且声名primary key时,运行时又说Primary key和unique
> key目前不支持。。那这就是个悖论啊。。真的不科学。
>
>
> 关键问题:我切换回使用老的planner时,是没问题的。。这可能是Blink Planner的bug么?真心请教。
>
>
> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
> that Table has a full primary keys if it is updated.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:82)
> at
> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.doJob(ZhangleClientComputeTask.java:80)
> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50)
> at
> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.main(ZhangleClientComputeTask.java:27)
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: 使用Flink 1.10 blink planner写ES的异常问题

sunfulin
好的,感谢。我在user里附加了query SQL。











在 2020-02-15 16:14:56,"Jark Wu" <[hidden email]> 写道:

>Hi sunfulin,
>
>这个异常是说通过 query 推断不出 query 的 primary key,不是说 sink 没有 primary key。至于为什么 query
>推断不出 pk,可能要结合 query 看一下。
>我看到你在 user@ 里面也发邮件了,我已经在那下面回复了,我们要不在 user@ 邮件下面继续讨论吧。可以将你们的 SQL 补充一下,包括
>DDL。
>
>Best,
>Jark
>
>On Fri, 14 Feb 2020 at 23:03, sunfulin <[hidden email]> wrote:
>
>> Hi,
>> 我使用Flink 1.10,开启了Blink Planner,在尝试写入ES且使用UpsertMode时(sql就是insert into
>> table select xxx group by xxxxx),抛出了如下异常:
>> 我通过DDL尝试定义一个ESTableSink,且声名primary key时,运行时又说Primary key和unique
>> key目前不支持。。那这就是个悖论啊。。真的不科学。
>>
>>
>> 关键问题:我切换回使用老的planner时,是没问题的。。这可能是Blink Planner的bug么?真心请教。
>>
>>
>> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
>> that Table has a full primary keys if it is updated.
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:82)
>> at
>> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.doJob(ZhangleClientComputeTask.java:80)
>> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50)
>> at
>> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.main(ZhangleClientComputeTask.java:27)
>>
>>