测试Flink版本:1.11.0
Flink sql支持这种语法插入吗,在插入时指定具体的字段顺序和要插入的列 Insert into tableName(col1[,col2]) select col1[,col2] 目前通过测试发现了以下问题 建表语句: create table t1(a int,b string,c int) with (); create table t2(a int,b string,c int) with (); 问题1:测试发现insert into时查询和sink schema的匹配规则是按照定义的顺序进行 测试语句: insert into t2 select t1.a,t1.c, t1.b from t1; 报错信息: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.t2 do not match. Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)] Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632) 问题2:支持Insert into tableName(col1[,col2]) select col1[,col2]的语法,但并 没有真正起作用,还是按照定义的顺序进行匹配 测试语句: insert into t2(a,c,b) select t1.a,t1.c, t1.b from t1; 报错信息: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.t2 do not match. Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)] Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632) 问题3:当insert into的字段比sink的schema的字段少也会如此 测试语句: insert into t2(a,b) select t1.a, t1.b from t1; 报错信息: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.t2 do not match. Query schema: [a: INT, c: VARCHAR(2147483647)] Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632) 总结: 目前的实现限制了查询的和写人的灵活性, 只有找到schema定义的字段顺序才能进行正确的插入, 当字段很多时会比较麻烦, 还有,只插入某些列的需求也是存在的,目前不能支持 |
Hi,
Flink 目前的确不支持这个语法... 我已经创建了一个 issue[1],可以在那里跟踪这个 feature 的进展。 [1] https://issues.apache.org/jira/browse/FLINK-18726 <[hidden email]> 于2020年7月27日周一 上午11:36写道: > 测试Flink版本:1.11.0 > > > > Flink sql支持这种语法插入吗,在插入时指定具体的字段顺序和要插入的列 > > Insert into tableName(col1[,col2]) select col1[,col2] > > > > 目前通过测试发现了以下问题 > > 建表语句: > > create table t1(a int,b string,c int) with (); > > create table t2(a int,b string,c int) with (); > > > > 问题1:测试发现insert into时查询和sink schema的匹配规则是按照定义的顺序进行 > > 测试语句: > > insert into t2 select t1.a,t1.c, t1.b from t1; > > 报错信息: > > org.apache.flink.table.api.ValidationException: Field types of query result > and registered TableSink default_catalog.default_database.t2 do not match. > > Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)] > > Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT] > > > > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI > mplicitCast(TableSinkUtils.scala:100) > > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann > erBase.scala:213) > > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann > erBase.scala:204) > > at scala.Option.map(Option.scala:146) > > at > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner > Base.scala:204) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str > eamPlanner.scala:98) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str > eamPlanner.scala:80) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: > 234) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: > 234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne > r.scala:80) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne > r.scala:43) > > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro > nmentImpl.java:632) > > > > 问题2:支持Insert into tableName(col1[,col2]) select col1[,col2]的语法,但并 > 没有真正起作用,还是按照定义的顺序进行匹配 > > 测试语句: > > insert into t2(a,c,b) select t1.a,t1.c, t1.b from t1; > > 报错信息: > > > > org.apache.flink.table.api.ValidationException: Field types of query result > and registered TableSink default_catalog.default_database.t2 do not match. > > Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)] > > Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT] > > > > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI > mplicitCast(TableSinkUtils.scala:100) > > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann > erBase.scala:213) > > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann > erBase.scala:204) > > at scala.Option.map(Option.scala:146) > > at > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner > Base.scala:204) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str > eamPlanner.scala:98) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str > eamPlanner.scala:80) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: > 234) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: > 234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne > r.scala:80) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne > r.scala:43) > > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro > nmentImpl.java:632) > > > > 问题3:当insert into的字段比sink的schema的字段少也会如此 > > 测试语句: > > insert into t2(a,b) > > select t1.a, t1.b from t1; > > 报错信息: > > org.apache.flink.table.api.ValidationException: Field types of query result > and registered TableSink default_catalog.default_database.t2 do not match. > > Query schema: [a: INT, c: VARCHAR(2147483647)] > > Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT] > > > > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI > mplicitCast(TableSinkUtils.scala:100) > > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann > erBase.scala:213) > > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann > erBase.scala:204) > > at scala.Option.map(Option.scala:146) > > at > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner > Base.scala:204) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str > eamPlanner.scala:98) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str > eamPlanner.scala:80) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: > 234) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: > 234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne > r.scala:80) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne > r.scala:43) > > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro > nmentImpl.java:632) > > > > 总结: > > 目前的实现限制了查询的和写人的灵活性, > > 只有找到schema定义的字段顺序才能进行正确的插入, > > 当字段很多时会比较麻烦, > > 还有,只插入某些列的需求也是存在的,目前不能支持 > > |
Free forum by Nabble | Edit this page |