Hi,
请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row<c1 varchar, c2 varchar>。 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈? |
Hi,
我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么? 能附上异常栈就更好啦。 sunfulin <[hidden email]> 于2020年6月25日周四 下午4:35写道: > Hi, > 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row<c1 varchar, c2 varchar>。 > 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈? -- Best, Benchao Li |
hi, 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner org.apache.flink.table.api.ValidationException: Only simple types that can be safely converted into a string representation can be used as keys. But was: Row(commentId: String, commentContent: String) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93) at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42) at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65) at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47) 在 2020-06-28 10:15:34,"Benchao Li" <[hidden email]> 写道: >Hi, >我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么? >能附上异常栈就更好啦。 > >sunfulin <[hidden email]> 于2020年6月25日周四 下午4:35写道: > >> Hi, >> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row<c1 varchar, c2 varchar>。 >> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈? > > > >-- > >Best, >Benchao Li |
Hi,
异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys ,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。 你可以试下下面的query,query keys 对应es中的 id 就是 commentId${keyDelimiter}commentContent, 这也应该是你需要的结果 Select ROW(commentId, commentContent) from T group by commentId, commentContent 祝好, Leonard Xu > 在 2020年6月28日,22:33,sunfulin <[hidden email]> 写道: > > > > > > > > hi, > 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner > > > org.apache.flink.table.api.ValidationException: Only simple types that can be safely converted into a string representation can be used as keys. But was: Row(commentId: String, commentContent: String) > at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310) > at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152) > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111) > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) > at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93) > at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42) > at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65) > at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47) > > > > > > > > > > > > > > > > 在 2020-06-28 10:15:34,"Benchao Li" <[hidden email]> 写道: >> Hi, >> 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么? >> 能附上异常栈就更好啦。 >> >> sunfulin <[hidden email]> 于2020年6月25日周四 下午4:35写道: >> >>> Hi, >>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row<c1 varchar, c2 varchar>。 >>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈? >> >> >> >> -- >> >> Best, >> Benchao Li |
hi, Leonard 这个写法应该是OK,不过我的场景下是下面这种 select a, b, row(commentId, commentContent) from T group by a, b, commentId, commentContent 这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈? 在 2020-06-29 10:19:31,"Leonard Xu" <[hidden email]> 写道: >Hi, >异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys ,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。 > >你可以试下下面的query,query keys 对应es中的 id 就是 commentId${keyDelimiter}commentContent, 这也应该是你需要的结果 >Select ROW(commentId, commentContent) from T >group by commentId, commentContent > >祝好, >Leonard Xu > >> 在 2020年6月28日,22:33,sunfulin <[hidden email]> 写道: >> >> >> >> >> >> >> >> hi, >> 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner >> >> >> org.apache.flink.table.api.ValidationException: Only simple types that can be safely converted into a string representation can be used as keys. But was: Row(commentId: String, commentContent: String) >> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310) >> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152) >> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111) >> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) >> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) >> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) >> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> at scala.collection.Iterator.foreach(Iterator.scala:937) >> at scala.collection.Iterator.foreach$(Iterator.scala:937) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685) >> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93) >> at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42) >> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65) >> at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47) >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-06-28 10:15:34,"Benchao Li" <[hidden email]> 写道: >>> Hi, >>> 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么? >>> 能附上异常栈就更好啦。 >>> >>> sunfulin <[hidden email]> 于2020年6月25日周四 下午4:35写道: >>> >>>> Hi, >>>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row<c1 varchar, c2 varchar>。 >>>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈? >>> >>> >>> >>> -- >>> >>> Best, >>> Benchao Li |
> 在 2020年6月29日,12:05,sunfulin <[hidden email]> 写道: > > 这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈? Hi, 在1.10.x 版本中,upsertSink 中推导 pk 是通过query 来推导,这个比较好的解决是等1.11发布后,通过在建表的DDL声明主键( PRIMARY KEY NOT ENFORCED), 如果要在1.10.x里解决,一般是改写下query,使得推导的pk能符合预期。这个写入es的sink要求 pk 是简单类型,而你的query又需要ROW(c, d) 复合类型, 不太好改写。想到hack一点的方式就是把c,d 拼接成一个字段c${delimeter}d,ROW(c, d) 用UDF构造,感觉这种也比较绕。如果业务上不是强需求ROW(c, d),又等不及1.11的话,可以在ES里多加一列就好了。 祝好, Leonard Xu |
Free forum by Nabble | Edit this page |