你好:
flink1.10版本,实时程序执行如下查询写入hbase,报错(参照 http://apache-flink.147419.n8.nabble.com/ 进行了修改仍然不成功,烦请看下原因) Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.autoai.cns.core.CNSDashboardDML$.responseTime(CNSDashboardDML.scala:178) at com.autoai.cns.core.CNSDashboardDML$.main(CNSDashboardDML.scala:60) at com.autoai.cns.core.CNSDashboardDML.main(CNSDashboardDML.scala) s""" |INSERT INTO ${databaseName}.response_time_sink |SELECT | rowkey, | ROW(`day`, `time`, initialize_route_avg_time, update_detour_avg_time, replace_avg_time, deviate_avg_time) AS cf |FROM |( | select CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range) rowkey,`day`,`time`, | MAX(CASE req_type WHEN '0' THEN num else 0 END) initialize_route_avg_time, | MAX(CASE req_type WHEN '1' THEN num else 0 END) update_detour_avg_time, | MAX(CASE req_type WHEN '2' THEN num else 0 END) replace_avg_time, | MAX(CASE req_type WHEN '3' THEN num else 0 END) deviate_avg_time | from | (SELECT DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), 'yyyy-MM-dd') `day`, | UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), 'yyyy-MM-dd HH:mm:ss')) * 1000 AS `time`, | req_type, | (CASE WHEN ResponseRemainingMile<=50 THEN '1' | WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250 THEN '2' | WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500 THEN '3' | WHEN ResponseRemainingMile> 500 THEN '4' END) as distance_range, | CAST(AVG(`value`) AS INT) num | FROM | ${databaseName}.metric_stream | WHERE | metric = 'http_common_request' | GROUP BY | TUMBLE(proctime, INTERVAL '10' SECOND),req_type,(CASE WHEN ResponseRemainingMile<=50 THEN '1' | WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250 THEN '2' | WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500 THEN '3' | WHEN ResponseRemainingMile> 500 THEN '4' END)) | group by CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range),`day`,`time` |) a |""".stripMargin 期待你的答复,谢谢! |
Hi,
> 在 2020年12月7日,16:41,爱成绕指柔 <[hidden email]> 写道: > > Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. 这个错误是在query 没法推断出主键,而 hbase sink 是一个upsert sink, 需要PK来实现upsert语义。 在1.10的版本的话,你可以试着改写下SQL,比如在query的外层再加一层group by rowkey, 这种方式性能比较差。 建议直接升级1.11.x版本,在1.11.0之后flink 支持定义PK,你这个case只需要在hbase表的DDL上声明rowkey为PK即可,不再需要通过query推断PK。 祝好, Leonard |
Hi,
你是不是没有订阅flink的用户邮件列表,所以有些邮件你看不到。 你可以发送任意内容的邮件到user-[hidden email] <mailto:[hidden email]> 即可订阅用户邮件列表,订阅后邮件列表里大家的提问和回答你都可以看见了。 [1] https://flink.apache.org/zh/community.html > 在 2020年12月7日,16:50,Leonard Xu <[hidden email]> 写道: > > Hi, > >> 在 2020年12月7日,16:41,爱成绕指柔 <[hidden email] <mailto:[hidden email]>> 写道: >> >> Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. > > 这个错误是在query 没法推断出主键,而 hbase sink 是一个upsert sink, 需要PK来实现upsert语义。 > > 在1.10的版本的话,你可以试着改写下SQL,比如在query的外层再加一层group by rowkey, 这种方式性能比较差。 > 建议直接升级1.11.x版本,在1.11.0之后flink 支持定义PK,你这个case只需要在hbase表的DDL上声明rowkey为PK即可,不再需要通过query推断PK。 > > 祝好, > Leonard |
旧版 'connector.type' = 'jdbc',新版 'connector' = 'jdbc'。
新旧区别,旧版根据查询决定key,新版你只需要定义了key就是upsert了,不需要查询符合一定要求。 Leonard Xu <[hidden email]> 于2020年12月7日周一 下午5:11写道: > Hi, > 你是不是没有订阅flink的用户邮件列表,所以有些邮件你看不到。 > 你可以发送任意内容的邮件到user-[hidden email] <mailto: > [hidden email]> 即可订阅用户邮件列表,订阅后邮件列表里大家的提问和回答你都可以看见了。 > > [1] https://flink.apache.org/zh/community.html > > > > 在 2020年12月7日,16:50,Leonard Xu <[hidden email]> 写道: > > > > Hi, > > > >> 在 2020年12月7日,16:41,爱成绕指柔 <[hidden email] <mailto:[hidden email]>> > 写道: > >> > >> Exception in thread "main" org.apache.flink.table.api.TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > > > > 这个错误是在query 没法推断出主键,而 hbase sink 是一个upsert sink, 需要PK来实现upsert语义。 > > > > 在1.10的版本的话,你可以试着改写下SQL,比如在query的外层再加一层group by rowkey, 这种方式性能比较差。 > > 建议直接升级1.11.x版本,在1.11.0之后flink > 支持定义PK,你这个case只需要在hbase表的DDL上声明rowkey为PK即可,不再需要通过query推断PK。 > > > > 祝好, > > Leonard > > |
Free forum by Nabble | Edit this page |