Re: flinkSQL1.11写出数据到jdbc fleld type do not match

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: flinkSQL1.11写出数据到jdbc fleld type do not match

Benchao Li-2
你的source跟sink的字段数量都不一样多,你需要让insert的语句的query的table schema跟sink表的schema相同才可以。
比如可以用下面的SQL来写入:
```SQL
insert into cloud_behavior_sink
select
    operation,
    operation_channel,
    ip,
    lat,
    lng,
    user_id,
    device_id
from cloud_behavior_source;
```

奔跑的小飞袁 <[hidden email]> 于2020年10月19日周一 下午4:29写道:

> hello
> 我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗?
> 下面是我的异常日志以及sql文件
>
> SET stream.enableCheckpointing=1000*60;
> SET stream.setParallelism=3;
>
> -- Kafka cdbp zdao source 表
> create TABLE cloud_behavior_source(
>     operation STRING,
>     operation_channel STRING,
>     `time` STRING,
>     ip STRING,
>     lat STRING,
>     lng STRING,
>     user_id STRING,
>     device_id STRING,
>     imei STRING,
>     targets ARRAY<ROW&lt;`type` STRING,`value` STRING>>,
>     product_name STRING,
>     product_version STRING,
>     product_vendor STRING,
>     platform STRING,
>     platform_version STRING,
>     `languaage` STRING,
>     locale STRING,
>     other_para MAP<STRING, STRING NULL>
> ) with (
>     'connector'='kafka',
>     'topic'='cloud_behavior',
>     'properties.bootstrap.servers'='',
>     'properties.group.id'='testGroup',
>     'format'='avro',
>     'scan.startup.mode'='earliest-offset'
> );
>
> -- Hbase zdao uv 统计 Sink 表
> create TABLE cloud_behavior_sink(
>     operation STRING,
>     operation_channel STRING,
>     ip STRING,
>     lat STRING,
>     lng STRING,
>     user_id STRING,
>     device_id STRING
> ) with (
>     'connector'='jdbc',
>     'url'='jdbc:mysql://hosts:3306/d_bigdata',
>     'table-name'='flink_sql_test',
>     'username'='',
>     'password'='',
>     'sink.buffer-flush.max-rows'='100'
> );
>
> -- 业务过程
> insert into cloud_behavior_sink
> select
>      *
> from cloud_behavior_source;
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/data1/flink/flink-1.11.1-log/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Field types of query result and registered TableSink
> default_catalog.default_database.cloud_behavior_sink do not match.
> Query schema: [operation: VARCHAR(2147483647), operation_channel:
> VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
> lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
> VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
> VARCHAR(2147483647), targets: ARRAY<ROW&lt;`type` VARCHAR(2147483647),
> `value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647),
> product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
> platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
> languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
> MAP<VARCHAR(2147483647), VARCHAR(2147483647)>]
> Sink schema: [operation: VARCHAR(2147483647), operation_channel:
> VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647),
> lng:
> VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
> VARCHAR(2147483647)]
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
>         at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink
> default_catalog.default_database.cloud_behavior_sink do not match.
> Query schema: [operation: VARCHAR(2147483647), operation_channel:
> VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
> lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
> VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
> VARCHAR(2147483647), targets: ARRAY<ROW&lt;`type` VARCHAR(2147483647),
> `value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647),
> product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
> platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
> languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
> MAP<VARCHAR(2147483647), VARCHAR(2147483647)>]
> Sink schema: [operation: VARCHAR(2147483647), operation_channel:
> VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647),
> lng:
> VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
> VARCHAR(2147483647)]
>         at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
>         at scala.Option.map(Option.scala:146)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>         at
>
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97)
>         at
>
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72)
>         at
>
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53)
>         at
>
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>         ... 11 more
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--

Best,
Benchao Li