【flink sql】flink sql insert into插入语句的问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

【flink sql】flink sql insert into插入语句的问题

silence-2
测试Flink版本:1.11.0

 

Flink sql支持这种语法插入吗,在插入时指定具体的字段顺序和要插入的列

Insert into tableName(col1[,col2]) select col1[,col2]

 

目前通过测试发现了以下问题

建表语句:

create table t1(a int,b string,c int) with ();

create table t2(a int,b string,c int) with ();

 

问题1:测试发现insert into时查询和sink schema的匹配规则是按照定义的顺序进行

测试语句:

insert into t2 select t1.a,t1.c, t1.b from  t1;

报错信息:

org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink default_catalog.default_database.t2 do not match.

Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)]

Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]

 

       at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI
mplicitCast(TableSinkUtils.scala:100)

       at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
erBase.scala:213)

       at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
erBase.scala:204)

       at scala.Option.map(Option.scala:146)

       at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner
Base.scala:204)

       at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
eamPlanner.scala:98)

       at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
eamPlanner.scala:80)

       at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
234)

       at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
234)

       at scala.collection.Iterator$class.foreach(Iterator.scala:891)

       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

       at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

       at scala.collection.AbstractTraversable.map(Traversable.scala:104)

       at
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
r.scala:80)

       at
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
r.scala:43)

       at
org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro
nmentImpl.java:632)

 

问题2:支持Insert into tableName(col1[,col2]) select col1[,col2]的语法,但并
没有真正起作用,还是按照定义的顺序进行匹配

测试语句:

insert into t2(a,c,b) select t1.a,t1.c, t1.b from  t1;

报错信息:

 

org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink default_catalog.default_database.t2 do not match.

Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)]

Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]

 

       at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI
mplicitCast(TableSinkUtils.scala:100)

       at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
erBase.scala:213)

       at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
erBase.scala:204)

       at scala.Option.map(Option.scala:146)

       at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner
Base.scala:204)

       at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
eamPlanner.scala:98)

       at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
eamPlanner.scala:80)

       at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
234)

       at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
234)

       at scala.collection.Iterator$class.foreach(Iterator.scala:891)

       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

       at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

       at scala.collection.AbstractTraversable.map(Traversable.scala:104)

       at
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
r.scala:80)

       at
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
r.scala:43)

       at
org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro
nmentImpl.java:632)

 

问题3:当insert into的字段比sink的schema的字段少也会如此

测试语句:

insert into t2(a,b)

select t1.a, t1.b from t1;

报错信息:

org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink default_catalog.default_database.t2 do not match.

Query schema: [a: INT, c: VARCHAR(2147483647)]

Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]

 

       at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI
mplicitCast(TableSinkUtils.scala:100)

       at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
erBase.scala:213)

       at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
erBase.scala:204)

       at scala.Option.map(Option.scala:146)

       at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner
Base.scala:204)

       at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
eamPlanner.scala:98)

       at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
eamPlanner.scala:80)

       at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
234)

       at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
234)

       at scala.collection.Iterator$class.foreach(Iterator.scala:891)

       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

       at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

       at scala.collection.AbstractTraversable.map(Traversable.scala:104)

       at
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
r.scala:80)

       at
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
r.scala:43)

       at
org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro
nmentImpl.java:632)

 

总结:

目前的实现限制了查询的和写人的灵活性,

只有找到schema定义的字段顺序才能进行正确的插入,

当字段很多时会比较麻烦,

还有,只插入某些列的需求也是存在的,目前不能支持

Reply | Threaded
Open this post in threaded view
|

Re: 【flink sql】flink sql insert into插入语句的问题

Caizhi Weng
Hi,

Flink 目前的确不支持这个语法... 我已经创建了一个 issue[1],可以在那里跟踪这个 feature 的进展。

[1] https://issues.apache.org/jira/browse/FLINK-18726

<[hidden email]> 于2020年7月27日周一 上午11:36写道:

> 测试Flink版本:1.11.0
>
>
>
> Flink sql支持这种语法插入吗,在插入时指定具体的字段顺序和要插入的列
>
> Insert into tableName(col1[,col2]) select col1[,col2]
>
>
>
> 目前通过测试发现了以下问题
>
> 建表语句:
>
> create table t1(a int,b string,c int) with ();
>
> create table t2(a int,b string,c int) with ();
>
>
>
> 问题1:测试发现insert into时查询和sink schema的匹配规则是按照定义的顺序进行
>
> 测试语句:
>
> insert into t2 select t1.a,t1.c, t1.b from  t1;
>
> 报错信息:
>
> org.apache.flink.table.api.ValidationException: Field types of query result
> and registered TableSink default_catalog.default_database.t2 do not match.
>
> Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)]
>
> Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]
>
>
>
>        at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI
> mplicitCast(TableSinkUtils.scala:100)
>
>        at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:213)
>
>        at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:204)
>
>        at scala.Option.map(Option.scala:146)
>
>        at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner
> Base.scala:204)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:98)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:80)
>
>        at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>        at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
>        at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>        at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
>        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:80)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:43)
>
>        at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro
> nmentImpl.java:632)
>
>
>
> 问题2:支持Insert into tableName(col1[,col2]) select col1[,col2]的语法,但并
> 没有真正起作用,还是按照定义的顺序进行匹配
>
> 测试语句:
>
> insert into t2(a,c,b) select t1.a,t1.c, t1.b from  t1;
>
> 报错信息:
>
>
>
> org.apache.flink.table.api.ValidationException: Field types of query result
> and registered TableSink default_catalog.default_database.t2 do not match.
>
> Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)]
>
> Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]
>
>
>
>        at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI
> mplicitCast(TableSinkUtils.scala:100)
>
>        at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:213)
>
>        at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:204)
>
>        at scala.Option.map(Option.scala:146)
>
>        at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner
> Base.scala:204)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:98)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:80)
>
>        at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>        at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
>        at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>        at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
>        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:80)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:43)
>
>        at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro
> nmentImpl.java:632)
>
>
>
> 问题3:当insert into的字段比sink的schema的字段少也会如此
>
> 测试语句:
>
> insert into t2(a,b)
>
> select t1.a, t1.b from t1;
>
> 报错信息:
>
> org.apache.flink.table.api.ValidationException: Field types of query result
> and registered TableSink default_catalog.default_database.t2 do not match.
>
> Query schema: [a: INT, c: VARCHAR(2147483647)]
>
> Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]
>
>
>
>        at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI
> mplicitCast(TableSinkUtils.scala:100)
>
>        at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:213)
>
>        at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:204)
>
>        at scala.Option.map(Option.scala:146)
>
>        at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner
> Base.scala:204)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:98)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:80)
>
>        at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>        at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
>        at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>        at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
>        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:80)
>
>        at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:43)
>
>        at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro
> nmentImpl.java:632)
>
>
>
> 总结:
>
> 目前的实现限制了查询的和写人的灵活性,
>
> 只有找到schema定义的字段顺序才能进行正确的插入,
>
> 当字段很多时会比较麻烦,
>
> 还有,只插入某些列的需求也是存在的,目前不能支持
>
>