flink sql 怎样将从hbase中取出的BYTES类型转换成Int

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql 怎样将从hbase中取出的BYTES类型转换成Int

Zhou Zach




Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match.
Query schema: [time: STRING, age: BYTES]
Sink schema: [time: STRING, sum_age: INT]
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

Leonard Xu
Hi,
看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql 中一般不需要读取bytes,读取到的数据应该是 FLINK SQL对应的类型,如 int, bigint,string等,方便把你的 SQL 贴下吗?

祝好,
Leonard Xu

> 在 2020年6月15日,19:55,Zhou Zach <[hidden email]> 写道:
>
>
>
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match.
> Query schema: [time: STRING, age: BYTES]
> Sink schema: [time: STRING, sum_age: INT]

Reply | Threaded
Open this post in threaded view
|

Re:Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

Zhou Zach
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)

val conf = new Configuration
val users = new HBaseTableSource(conf, "user_hbase3")
users.setRowKey("rowkey", classOf[String]) // currency as the primary key
users.addColumn("cf", "age", classOf[Array[Byte]])

streamTableEnv.registerTableSource("users", users)


streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_behavior (
    |    uid VARCHAR,
    |    phoneType VARCHAR,
    |    clickCount INT,
    |    `time` TIMESTAMP(3)
    |) WITH (
    |    'connector.type' = 'kafka',
    |    'connector.version' = 'universal',
    |    'connector.topic' = 'user_behavior',
    |    'connector.startup-mode' = 'earliest-offset',
    |    'connector.properties.0.key' = 'zookeeper.connect',
    |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
    |    'connector.properties.1.key' = 'bootstrap.servers',
    |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
    |    'update-mode' = 'append',
    |    'format.type' = 'json',
    |    'format.derive-schema' = 'true'
    |)
    |""".stripMargin)

streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_cnt (
    |    `time` VARCHAR,
    |    sum_age INT
    |) WITH (
    |    'connector.type' = 'jdbc',
    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
    |    'connector.table' = 'user_cnt',
    |    'connector.username' = 'root',
    |    'connector.password' = '123456',
    |    'connector.write.flush.max-rows' = '1'
    |)
    |""".stripMargin)


streamTableEnv.sqlUpdate(
"""
    |
    |insert into  user_cnt
    |SELECT
    |  cast(b.`time` as string) as `time`,  u.age
    |FROM
    |  (select * , PROCTIME() AS proctime from user_behavior) AS b
    |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
    |  ON b.uid = u.rowkey
    |
    |""".stripMargin)

















在 2020-06-15 20:01:16,"Leonard Xu" <[hidden email]> 写道:

>Hi,
>看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql 中一般不需要读取bytes,读取到的数据应该是 FLINK SQL对应的类型,如 int, bigint,string等,方便把你的 SQL 贴下吗?
>
>祝好,
>Leonard Xu
>
>> 在 2020年6月15日,19:55,Zhou Zach <[hidden email]> 写道:
>>
>>
>>
>>
>>
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match.
>> Query schema: [time: STRING, age: BYTES]
>> Sink schema: [time: STRING, sum_age: INT]
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

Zhou Zach
hbase中维表:


streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_hbase3(
    |    rowkey string,
    |    cf ROW(sex VARCHAR, age INT, created_time TIMESTAMP(3))
    |) WITH (
    |    'connector.type' = 'hbase',
    |    'connector.version' = '2.1.0',
    |    'connector.table-name' = 'user_hbase3',
    |    'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
    |    'connector.zookeeper.znode.parent' = '/hbase',
    |    'connector.write.buffer-flush.max-size' = '10mb',
    |    'connector.write.buffer-flush.max-rows' = '1000',
    |    'connector.write.buffer-flush.interval' = '2s'
    |)
    |""".stripMargin)

















At 2020-06-15 20:19:22, "Zhou Zach" <[hidden email]> wrote:

>val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
>
>val conf = new Configuration
>val users = new HBaseTableSource(conf, "user_hbase3")
>users.setRowKey("rowkey", classOf[String]) // currency as the primary key
>users.addColumn("cf", "age", classOf[Array[Byte]])
>
>streamTableEnv.registerTableSource("users", users)
>
>
>streamTableEnv.sqlUpdate(
>"""
>    |
>    |CREATE TABLE user_behavior (
>    |    uid VARCHAR,
>    |    phoneType VARCHAR,
>    |    clickCount INT,
>    |    `time` TIMESTAMP(3)
>    |) WITH (
>    |    'connector.type' = 'kafka',
>    |    'connector.version' = 'universal',
>    |    'connector.topic' = 'user_behavior',
>    |    'connector.startup-mode' = 'earliest-offset',
>    |    'connector.properties.0.key' = 'zookeeper.connect',
>    |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>    |    'connector.properties.1.key' = 'bootstrap.servers',
>    |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>    |    'update-mode' = 'append',
>    |    'format.type' = 'json',
>    |    'format.derive-schema' = 'true'
>    |)
>    |""".stripMargin)
>
>streamTableEnv.sqlUpdate(
>"""
>    |
>    |CREATE TABLE user_cnt (
>    |    `time` VARCHAR,
>    |    sum_age INT
>    |) WITH (
>    |    'connector.type' = 'jdbc',
>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>    |    'connector.table' = 'user_cnt',
>    |    'connector.username' = 'root',
>    |    'connector.password' = '123456',
>    |    'connector.write.flush.max-rows' = '1'
>    |)
>    |""".stripMargin)
>
>
>streamTableEnv.sqlUpdate(
>"""
>    |
>    |insert into  user_cnt
>    |SELECT
>    |  cast(b.`time` as string) as `time`,  u.age
>    |FROM
>    |  (select * , PROCTIME() AS proctime from user_behavior) AS b
>    |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>    |  ON b.uid = u.rowkey
>    |
>    |""".stripMargin)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-15 20:01:16,"Leonard Xu" <[hidden email]> 写道:
>>Hi,
>>看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql 中一般不需要读取bytes,读取到的数据应该是 FLINK SQL对应的类型,如 int, bigint,string等,方便把你的 SQL 贴下吗?
>>
>>祝好,
>>Leonard Xu
>>
>>> 在 2020年6月15日,19:55,Zhou Zach <[hidden email]> 写道:
>>>
>>>
>>>
>>>
>>>
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match.
>>> Query schema: [time: STRING, age: BYTES]
>>> Sink schema: [time: STRING, sum_age: INT]
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

Leonard Xu
Hi,
看你有两个地方声明hbase的表,

>    |    cf ROW(sex VARCHAR, age INT, created_time TIMESTAMP(3))

这种方式应该是ok的,

> users.addColumn("cf", "age", classOf[Array[Byte]])


你这里为什么声明 age 的data type 为什么声明 classOf[Array[Byte]] ? 是不是忘了修改了?
这里使用 users.addColumn("cf", "age", classOf[Integer]) 应该就行了。

通过DDL 或者 在TableEnvironment 上注册表,使用一个就行了。建议用DDL

祝好