Hi 大家好:
Flink版本1.9.0, SQL1: CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); SQL2: CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘******', 'connector.write.flush.max-rows' = '1' ); SQL3: INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum FROM (select DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; 在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) 请问去重现在不支持insert into select 语法吗? | | 叶贤勋 | | [hidden email] | 签名由网易邮箱大师定制 |
Hi 叶贤勋:
现在去重现在支持insert into select 语法。 问题在于你的这个SQL怎么没产出UniqueKey 这里面可能有blink-planner的bug。 CC: @Jark Wu @godfrey he (JIRA) Best, Jingsong Lee ------------------------------------------------------------------ From:叶贤勋 <[hidden email]> Send Time:2019年11月21日(星期四) 16:20 To:[hidden email] <[hidden email]> Subject:DML去重,translate时报错 Hi 大家好: Flink版本1.9.0, SQL1: CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); SQL2: CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘******', 'connector.write.flush.max-rows' = '1' ); SQL3: INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum FROM (select DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; 在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) 请问去重现在不支持insert into select 语法吗? | | 叶贤勋 | | [hidden email] | 签名由网易邮箱大师定制 |
hi 叶贤勋:
你的SQL里先 ORDER BY proc desc ,然后取 rownum = 1, 等价于 last row 的逻辑。此时会产生 retraction,但是结果表(user_dist) 没有定义 pk 信息,此时是不支持的,即报你看到的错误。 如果将 ORDER BY proc desc 改为 ORDER BY proc asc,加上 rownum = 1,等价于 first row 的逻辑,不会产生retraction,此时的结果表(user_dist) 是可以满足要求的。 但是 blink planner 目前处理 PROCTIME() 有问题,sql 优化过程将 PROCTIME() 属性丢掉了,被认为只是一个普通的 timestamp 类型,不会翻译成 first row 的逻辑。我建了一个 issue 来 fix 这个问题。 thanks, godfrey ------------------------------------------------------------------ 发件人:JingsongLee <[hidden email]> 发送时间:2019年11月21日(星期四) 18:44 收件人:user-zh <[hidden email]>; Jark Wu <[hidden email]>; godfrey he (JIRA) <[hidden email]> 主 题:Re: DML去重,translate时报错 Hi 叶贤勋: 现在去重现在支持insert into select 语法。 问题在于你的这个SQL怎么没产出UniqueKey 这里面可能有blink-planner的bug。 CC: @Jark Wu @godfrey he (JIRA) Best, Jingsong Lee ------------------------------------------------------------------ From:叶贤勋 <[hidden email]> Send Time:2019年11月21日(星期四) 16:20 To:[hidden email] <[hidden email]> Subject:DML去重,translate时报错 Hi 大家好: Flink版本1.9.0, SQL1: CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); SQL2: CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘******', 'connector.write.flush.max-rows' = '1' ); SQL3: INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum FROM (select DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; 在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) 请问去重现在不支持insert into select 语法吗? | | 叶贤勋 | | [hidden email] | 签名由网易邮箱大师定制 |
Hi JingsongLee, 晓令:
谢谢你们的答疑。 备注issue链接:https://issues.apache.org/jira/projects/FLINK/issues/FLINK-14899?filter=allissues | | 叶贤勋 | | [hidden email] | 签名由网易邮箱大师定制 在2019年11月21日 22:00,贺小令(晓令)<[hidden email]> 写道: hi 叶贤勋: 你的SQL里先 ORDER BY proc desc ,然后取 rownum = 1, 等价于 last row 的逻辑。此时会产生 retraction,但是结果表(user_dist) 没有定义 pk 信息,此时是不支持的,即报你看到的错误。 如果将 ORDER BY proc desc 改为 ORDER BY proc asc,加上 rownum = 1,等价于 first row 的逻辑,不会产生retraction,此时的结果表(user_dist) 是可以满足要求的。 但是 blink planner 目前处理 PROCTIME() 有问题,sql 优化过程将 PROCTIME() 属性丢掉了,被认为只是一个普通的 timestamp 类型,不会翻译成 first row 的逻辑。我建了一个 issue 来 fix 这个问题。 thanks, godfrey ------------------------------------------------------------------ 发件人:JingsongLee <[hidden email]> 发送时间:2019年11月21日(星期四) 18:44 收件人:user-zh <[hidden email]>; Jark Wu <[hidden email]>; godfrey he (JIRA) <[hidden email]> 主 题:Re: DML去重,translate时报错 Hi 叶贤勋: 现在去重现在支持insert into select 语法。 问题在于你的这个SQL怎么没产出UniqueKey 这里面可能有blink-planner的bug。 CC: @Jark Wu @godfrey he (JIRA) Best, Jingsong Lee ------------------------------------------------------------------ From:叶贤勋 <[hidden email]> Send Time:2019年11月21日(星期四) 16:20 To:[hidden email] <[hidden email]> Subject:DML去重,translate时报错 Hi 大家好: Flink版本1.9.0, SQL1: CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); SQL2: CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘******', 'connector.write.flush.max-rows' = '1' ); SQL3: INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum FROM (select DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; 在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) 请问去重现在不支持insert into select 语法吗? | | 叶贤勋 | | [hidden email] | 签名由网易邮箱大师定制 |
Free forum by Nabble | Edit this page |