参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
flink版本1.10 代码如下: public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //以后版本会将old planner移除 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" + " user_id BIGINT,\n" + " item_id BIGINT,\n" + " category_id BIGINT,\n" + " behavior STRING,\n" + " ts TIMESTAMP(3),\n" + " proctime as PROCTIME(), -- 通过计算列产生一个处理时间列\n" + " WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列\n" + ") WITH (\n" + " 'connector.type' = 'kafka', -- 使用 kafka connector\n" + " 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本\n" + " 'connector.topic' = 'user_behavior', -- kafka topic\n" + " 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址\n" + " 'format.type' = 'json' -- 数据源格式为 json\n" + ")"); Table table1 = tableEnv.sqlQuery("select user_id,item_id,category_id,behavior,ts," + "proctime from user_behavior where behavior='buy'"); tableEnv.toAppendStream(table1, Behavior.class).print(); env.execute(); } public class Behavior { public Long user_id; public Long item_id; public Long category_id; public String behavior; public Timestamp ts; public Timestamp proctime; @Override public String toString() { return "Behavior{" + "user_id=" + user_id + ", item_id=" + item_id + ", category_id=" + category_id + ", behavior='" + behavior + '\'' + ", ts=" + ts + ", proctime=" + proctime + '}'; } } 报错如下: Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink do not match. Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT, behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT NULL *PROCTIME*] Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT, proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) at sql.KafkaSourceTable.main(KafkaSourceTable.java:35) pojo的类型定义是和source table字段类型是一致的, 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*? |
Administrator
|
看起来是一个已经修复的 bug (FLINK-16108)。
你可以用正在 RC 的 release-1.10.1 再试下吗? https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc2/ Best, Jark On Mon, 4 May 2020 at 01:01, 祝尚 <[hidden email]> wrote: > 参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错 > flink版本1.10 > 代码如下: > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > //以后版本会将old planner移除 > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > settings); > tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" + > " user_id BIGINT,\n" + > " item_id BIGINT,\n" + > " category_id BIGINT,\n" + > " behavior STRING,\n" + > " ts TIMESTAMP(3),\n" + > " proctime as PROCTIME(), -- 通过计算列产生一个处理时间列\n" + > " WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- > 在ts上定义watermark,ts成为事件时间列\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka', -- 使用 kafka connector\n" + > " 'connector.version' = 'universal', -- kafka 版本,universal > 支持 0.11 以上的版本\n" + > " 'connector.topic' = 'user_behavior', -- kafka topic\n" + > " 'connector.startup-mode' = 'earliest-offset', -- 从起始 > offset 开始读取\n" + > " 'connector.properties.zookeeper.connect' = > 'localhost:2181', -- zookeeper 地址\n" + > " 'connector.properties.bootstrap.servers' = > 'localhost:9092', -- kafka broker 地址\n" + > " 'format.type' = 'json' -- 数据源格式为 json\n" + > ")"); > Table table1 = tableEnv.sqlQuery("select > user_id,item_id,category_id,behavior,ts," + > "proctime from user_behavior where behavior='buy'"); > tableEnv.toAppendStream(table1, Behavior.class).print(); > env.execute(); > > } > > public class Behavior { > public Long user_id; > public Long item_id; > public Long category_id; > public String behavior; > public Timestamp ts; > public Timestamp proctime; > > > @Override > public String toString() { > return "Behavior{" + > "user_id=" + user_id + > ", item_id=" + item_id + > ", category_id=" + category_id + > ", behavior='" + behavior + '\'' + > ", ts=" + ts + > ", proctime=" + proctime + > '}'; > } > } > 报错如下: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field types of query result and registered TableSink do not match. > Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT, > behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT > NULL *PROCTIME*] > Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT, > proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT] > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) > at sql.KafkaSourceTable.main(KafkaSourceTable.java:35) > > pojo的类型定义是和source table字段类型是一致的, > 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*? |
好的,我试试,感谢
| | Sun.Zhu | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年05月04日 11:22,Jark Wu 写道: 看起来是一个已经修复的 bug (FLINK-16108)。 你可以用正在 RC 的 release-1.10.1 再试下吗? https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc2/ Best, Jark On Mon, 4 May 2020 at 01:01, 祝尚 <[hidden email]> wrote: > 参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错 > flink版本1.10 > 代码如下: > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > //以后版本会将old planner移除 > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > settings); > tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" + > " user_id BIGINT,\n" + > " item_id BIGINT,\n" + > " category_id BIGINT,\n" + > " behavior STRING,\n" + > " ts TIMESTAMP(3),\n" + > " proctime as PROCTIME(), -- 通过计算列产生一个处理时间列\n" + > " WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- > 在ts上定义watermark,ts成为事件时间列\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka', -- 使用 kafka connector\n" + > " 'connector.version' = 'universal', -- kafka 版本,universal > 支持 0.11 以上的版本\n" + > " 'connector.topic' = 'user_behavior', -- kafka topic\n" + > " 'connector.startup-mode' = 'earliest-offset', -- 从起始 > offset 开始读取\n" + > " 'connector.properties.zookeeper.connect' = > 'localhost:2181', -- zookeeper 地址\n" + > " 'connector.properties.bootstrap.servers' = > 'localhost:9092', -- kafka broker 地址\n" + > " 'format.type' = 'json' -- 数据源格式为 json\n" + > ")"); > Table table1 = tableEnv.sqlQuery("select > user_id,item_id,category_id,behavior,ts," + > "proctime from user_behavior where behavior='buy'"); > tableEnv.toAppendStream(table1, Behavior.class).print(); > env.execute(); > > } > > public class Behavior { > public Long user_id; > public Long item_id; > public Long category_id; > public String behavior; > public Timestamp ts; > public Timestamp proctime; > > > @Override > public String toString() { > return "Behavior{" + > "user_id=" + user_id + > ", item_id=" + item_id + > ", category_id=" + category_id + > ", behavior='" + behavior + '\'' + > ", ts=" + ts + > ", proctime=" + proctime + > '}'; > } > } > 报错如下: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field types of query result and registered TableSink do not match. > Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT, > behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT > NULL *PROCTIME*] > Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT, > proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT] > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) > at sql.KafkaSourceTable.main(KafkaSourceTable.java:35) > > pojo的类型定义是和source table字段类型是一致的, > 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*? |
Free forum by Nabble | Edit this page |