Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match. Query schema: [time: STRING, age: BYTES] Sink schema: [time: STRING, sum_age: INT] |
Hi,
看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql 中一般不需要读取bytes,读取到的数据应该是 FLINK SQL对应的类型,如 int, bigint,string等,方便把你的 SQL 贴下吗? 祝好, Leonard Xu > 在 2020年6月15日,19:55,Zhou Zach <[hidden email]> 写道: > > > > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match. > Query schema: [time: STRING, age: BYTES] > Sink schema: [time: STRING, sum_age: INT] |
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) val conf = new Configuration val users = new HBaseTableSource(conf, "user_hbase3") users.setRowKey("rowkey", classOf[String]) // currency as the primary key users.addColumn("cf", "age", classOf[Array[Byte]]) streamTableEnv.registerTableSource("users", users) streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_behavior ( | uid VARCHAR, | phoneType VARCHAR, | clickCount INT, | `time` TIMESTAMP(3) |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user_behavior', | 'connector.startup-mode' = 'earliest-offset', | 'connector.properties.0.key' = 'zookeeper.connect', | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.1.key' = 'bootstrap.servers', | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) |""".stripMargin) streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_cnt ( | `time` VARCHAR, | sum_age INT |) WITH ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', | 'connector.table' = 'user_cnt', | 'connector.username' = 'root', | 'connector.password' = '123456', | 'connector.write.flush.max-rows' = '1' |) |""".stripMargin) streamTableEnv.sqlUpdate( """ | |insert into user_cnt |SELECT | cast(b.`time` as string) as `time`, u.age |FROM | (select * , PROCTIME() AS proctime from user_behavior) AS b | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u | ON b.uid = u.rowkey | |""".stripMargin) 在 2020-06-15 20:01:16,"Leonard Xu" <[hidden email]> 写道: >Hi, >看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql 中一般不需要读取bytes,读取到的数据应该是 FLINK SQL对应的类型,如 int, bigint,string等,方便把你的 SQL 贴下吗? > >祝好, >Leonard Xu > >> 在 2020年6月15日,19:55,Zhou Zach <[hidden email]> 写道: >> >> >> >> >> >> Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match. >> Query schema: [time: STRING, age: BYTES] >> Sink schema: [time: STRING, sum_age: INT] |
hbase中维表:
streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_hbase3( | rowkey string, | cf ROW(sex VARCHAR, age INT, created_time TIMESTAMP(3)) |) WITH ( | 'connector.type' = 'hbase', | 'connector.version' = '2.1.0', | 'connector.table-name' = 'user_hbase3', | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.zookeeper.znode.parent' = '/hbase', | 'connector.write.buffer-flush.max-size' = '10mb', | 'connector.write.buffer-flush.max-rows' = '1000', | 'connector.write.buffer-flush.interval' = '2s' |) |""".stripMargin) At 2020-06-15 20:19:22, "Zhou Zach" <[hidden email]> wrote: >val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) > >val conf = new Configuration >val users = new HBaseTableSource(conf, "user_hbase3") >users.setRowKey("rowkey", classOf[String]) // currency as the primary key >users.addColumn("cf", "age", classOf[Array[Byte]]) > >streamTableEnv.registerTableSource("users", users) > > >streamTableEnv.sqlUpdate( >""" > | > |CREATE TABLE user_behavior ( > | uid VARCHAR, > | phoneType VARCHAR, > | clickCount INT, > | `time` TIMESTAMP(3) > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user_behavior', > | 'connector.startup-mode' = 'earliest-offset', > | 'connector.properties.0.key' = 'zookeeper.connect', > | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.1.key' = 'bootstrap.servers', > | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'update-mode' = 'append', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |) > |""".stripMargin) > >streamTableEnv.sqlUpdate( >""" > | > |CREATE TABLE user_cnt ( > | `time` VARCHAR, > | sum_age INT > |) WITH ( > | 'connector.type' = 'jdbc', > | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', > | 'connector.table' = 'user_cnt', > | 'connector.username' = 'root', > | 'connector.password' = '123456', > | 'connector.write.flush.max-rows' = '1' > |) > |""".stripMargin) > > >streamTableEnv.sqlUpdate( >""" > | > |insert into user_cnt > |SELECT > | cast(b.`time` as string) as `time`, u.age > |FROM > | (select * , PROCTIME() AS proctime from user_behavior) AS b > | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u > | ON b.uid = u.rowkey > | > |""".stripMargin) > > > > > > > > > > > > > > > > > >在 2020-06-15 20:01:16,"Leonard Xu" <[hidden email]> 写道: >>Hi, >>看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql 中一般不需要读取bytes,读取到的数据应该是 FLINK SQL对应的类型,如 int, bigint,string等,方便把你的 SQL 贴下吗? >> >>祝好, >>Leonard Xu >> >>> 在 2020年6月15日,19:55,Zhou Zach <[hidden email]> 写道: >>> >>> >>> >>> >>> >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match. >>> Query schema: [time: STRING, age: BYTES] >>> Sink schema: [time: STRING, sum_age: INT] |
Hi,
看你有两个地方声明hbase的表, > | cf ROW(sex VARCHAR, age INT, created_time TIMESTAMP(3)) 这种方式应该是ok的, > users.addColumn("cf", "age", classOf[Array[Byte]]) 你这里为什么声明 age 的data type 为什么声明 classOf[Array[Byte]] ? 是不是忘了修改了? 这里使用 users.addColumn("cf", "age", classOf[Integer]) 应该就行了。 通过DDL 或者 在TableEnvironment 上注册表,使用一个就行了。建议用DDL 祝好 |
Free forum by Nabble | Edit this page |