你的source跟sink的字段数量都不一样多,你需要让insert的语句的query的table schema跟sink表的schema相同才可以。
比如可以用下面的SQL来写入: ```SQL insert into cloud_behavior_sink select operation, operation_channel, ip, lat, lng, user_id, device_id from cloud_behavior_source; ``` 奔跑的小飞袁 <[hidden email]> 于2020年10月19日周一 下午4:29写道: > hello > 我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗? > 下面是我的异常日志以及sql文件 > > SET stream.enableCheckpointing=1000*60; > SET stream.setParallelism=3; > > -- Kafka cdbp zdao source 表 > create TABLE cloud_behavior_source( > operation STRING, > operation_channel STRING, > `time` STRING, > ip STRING, > lat STRING, > lng STRING, > user_id STRING, > device_id STRING, > imei STRING, > targets ARRAY<ROW<`type` STRING,`value` STRING>>, > product_name STRING, > product_version STRING, > product_vendor STRING, > platform STRING, > platform_version STRING, > `languaage` STRING, > locale STRING, > other_para MAP<STRING, STRING NULL> > ) with ( > 'connector'='kafka', > 'topic'='cloud_behavior', > 'properties.bootstrap.servers'='', > 'properties.group.id'='testGroup', > 'format'='avro', > 'scan.startup.mode'='earliest-offset' > ); > > -- Hbase zdao uv 统计 Sink 表 > create TABLE cloud_behavior_sink( > operation STRING, > operation_channel STRING, > ip STRING, > lat STRING, > lng STRING, > user_id STRING, > device_id STRING > ) with ( > 'connector'='jdbc', > 'url'='jdbc:mysql://hosts:3306/d_bigdata', > 'table-name'='flink_sql_test', > 'username'='', > 'password'='', > 'sink.buffer-flush.max-rows'='100' > ); > > -- 业务过程 > insert into cloud_behavior_sink > select > * > from cloud_behavior_source; > > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > > [jar:file:/data1/flink/flink-1.11.1-log/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > > [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Field types of query result and registered TableSink > default_catalog.default_database.cloud_behavior_sink do not match. > Query schema: [operation: VARCHAR(2147483647), operation_channel: > VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647), > lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: > VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei: > VARCHAR(2147483647), targets: ARRAY<ROW<`type` VARCHAR(2147483647), > `value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647), > product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647), > platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647), > languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para: > MAP<VARCHAR(2147483647), VARCHAR(2147483647)>] > Sink schema: [operation: VARCHAR(2147483647), operation_channel: > VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), > lng: > VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: > VARCHAR(2147483647)] > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) > at > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > Caused by: org.apache.flink.table.api.ValidationException: Field types of > query result and registered TableSink > default_catalog.default_database.cloud_behavior_sink do not match. > Query schema: [operation: VARCHAR(2147483647), operation_channel: > VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647), > lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: > VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei: > VARCHAR(2147483647), targets: ARRAY<ROW<`type` VARCHAR(2147483647), > `value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647), > product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647), > platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647), > languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para: > MAP<VARCHAR(2147483647), VARCHAR(2147483647)>] > Sink schema: [operation: VARCHAR(2147483647), operation_channel: > VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), > lng: > VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: > VARCHAR(2147483647)] > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) > at scala.Option.map(Option.scala:146) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) > at > > com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97) > at > > com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72) > at > > com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53) > at > > com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 11 more > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
Free forum by Nabble | Edit this page |