自实现了kudu connector报错: 2020-09-09 18:34:59,442 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL statement. at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:579) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:515) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:596) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:315) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_262] at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256) ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:327) ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:284) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.ExecutionContext.createPipeline(ExecutionContext.java:281) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 8 more 希望是想调用DynamicTableSink,但是却调用老的 UpsertStreamTableSink了 |
Hi
这个错误一般是你的query 是upsert的query,没能推断出PK,所以报错了 。 如果是自定义的connector, 应该实现 DynamicTableSink 接口而不是 老的 UpsertStreamTableSink接口, 实现DynamicTableSink接口可以支持在表上定义PK,不用推导。 看这个报错,kudu的connector实现的是 老的UpsertStreamTableSink, 绕过的办法是改写下你的query,让query可以推导出pk。 祝好 Leonard > 在 2020年9月9日,20:27,kandy.wang <[hidden email]> 写道: > > > > 自实现了kudu connector报错: > > > 2020-09-09 18:34:59,442 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. > > org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL statement. > > at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:579) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:515) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:596) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:315) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_262] > > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256) ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:327) ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:284) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.gateway.local.ExecutionContext.createPipeline(ExecutionContext.java:281) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > > ... 8 more > 希望是想调用DynamicTableSink,但是却调用老的 UpsertStreamTableSink了 |
Free forum by Nabble | Edit this page |