hi,
kafka->Flink->kafka->mysql Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 使用kafka-connect是方便数据同时导出到其他存储 Flink定义输出表结构: CREATE TABLE print_table \ (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \ WITH (\ 'connector' = 'kafka', \ 'topic' = 'test_out', \ 'properties.bootstrap.servers' = '127.0.0.1:9092', \ 'sink.partitioner' = 'round-robin', \ 'format' = 'json') 输出的数据格式示例: {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"} 但是kafka-connect-jdbc的json格式需要schema和payload,示例: { "schema": { "type": "struct", "fields": [ { "type": "int64", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "name" } ], "optional": true, "name": "user" }, "payload": { "id": 1, "name": "admin" } } 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式? 当前Flink处理sql: INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' MINUTE) |
Administrator
|
Hi,
你需要在 DDL 和 query 上都补上 schema 和 payload: CREATE TABLE print_table \ (`schema` STRING, `payload` ROW<total_count BIGINT, username STRING, update_time TIMESTAMP(6)>) \ WITH (\ 'connector' = 'kafka', \ 'topic' = 'test_out', \ 'properties.bootstrap.servers' = '127.0.0.1:9092', \ 'sink.partitioner' = 'round-robin', \ 'format' = 'json') -- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload INSERT INTO output SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username, update_time) as payload FROM ... Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到 mysql 不是很方便么? Best, Jark On Mon, 27 Jul 2020 at 17:33, RS <[hidden email]> wrote: > hi, > kafka->Flink->kafka->mysql > Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 > 使用kafka-connect是方便数据同时导出到其他存储 > > > > Flink定义输出表结构: > > CREATE TABLE print_table \ > > (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \ > > WITH (\ > > 'connector' = 'kafka', \ > > 'topic' = 'test_out', \ > > 'properties.bootstrap.servers' = '127.0.0.1:9092', \ > > 'sink.partitioner' = 'round-robin', \ > > 'format' = 'json') > > > > > 输出的数据格式示例: > > {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"} > > > > > 但是kafka-connect-jdbc的json格式需要schema和payload,示例: > > { > > "schema": { > > "type": "struct", > > "fields": [ > > { > > "type": "int64", > > "optional": false, > > "field": "id" > > }, > > { > > "type": "string", > > "optional": true, > > "field": "name" > > } > > ], > > "optional": true, > > "name": "user" > > }, > > "payload": { > > "id": 1, > > "name": "admin" > > } > > } > > > > > 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式? > > 当前Flink处理sql: > > INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS > total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as > update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' > MINUTE) > > > |
Hi,
改了下sql,遇到一个新的问题: Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING, `update_time` TIMESTAMP(6)>'. SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time) as payload 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗? Thanks 在 2020-07-27 17:49:18,"Jark Wu" <[hidden email]> 写道: >Hi, > >你需要在 DDL 和 query 上都补上 schema 和 payload: > >CREATE TABLE print_table \ >(`schema` STRING, `payload` ROW<total_count BIGINT, username STRING, >update_time TIMESTAMP(6)>) \ >WITH (\ >'connector' = 'kafka', \ >'topic' = 'test_out', \ >'properties.bootstrap.servers' = '127.0.0.1:9092', \ >'sink.partitioner' = 'round-robin', \ >'format' = 'json') > >-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload >INSERT INTO output >SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username, >update_time) as payload >FROM ... > > >Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到 >mysql 不是很方便么? > >Best, >Jark > > >On Mon, 27 Jul 2020 at 17:33, RS <[hidden email]> wrote: > >> hi, >> kafka->Flink->kafka->mysql >> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 >> 使用kafka-connect是方便数据同时导出到其他存储 >> >> >> >> Flink定义输出表结构: >> >> CREATE TABLE print_table \ >> >> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \ >> >> WITH (\ >> >> 'connector' = 'kafka', \ >> >> 'topic' = 'test_out', \ >> >> 'properties.bootstrap.servers' = '127.0.0.1:9092', \ >> >> 'sink.partitioner' = 'round-robin', \ >> >> 'format' = 'json') >> >> >> >> >> 输出的数据格式示例: >> >> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"} >> >> >> >> >> 但是kafka-connect-jdbc的json格式需要schema和payload,示例: >> >> { >> >> "schema": { >> >> "type": "struct", >> >> "fields": [ >> >> { >> >> "type": "int64", >> >> "optional": false, >> >> "field": "id" >> >> }, >> >> { >> >> "type": "string", >> >> "optional": true, >> >> "field": "name" >> >> } >> >> ], >> >> "optional": true, >> >> "name": "user" >> >> }, >> >> "payload": { >> >> "id": 1, >> >> "name": "admin" >> >> } >> >> } >> >> >> >> >> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式? >> >> 当前Flink处理sql: >> >> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS >> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as >> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' >> MINUTE) >> >> >> |
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?
窗口里的时间用来做time attribute 列了吧,只能是TIMESTAMP(3), 其TIMESTAMP字段Flink是可以支持到TIMESTAMP(9)的 祝好 Leonard > 在 2020年7月27日,20:05,RS <[hidden email]> 写道: > > Hi, > 改了下sql,遇到一个新的问题: > Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING, `update_time` TIMESTAMP(6)>'. > > > SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time) as payload > > > 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗? > > > Thanks > 在 2020-07-27 17:49:18,"Jark Wu" <[hidden email]> 写道: >> Hi, >> >> 你需要在 DDL 和 query 上都补上 schema 和 payload: >> >> CREATE TABLE print_table \ >> (`schema` STRING, `payload` ROW<total_count BIGINT, username STRING, >> update_time TIMESTAMP(6)>) \ >> WITH (\ >> 'connector' = 'kafka', \ >> 'topic' = 'test_out', \ >> 'properties.bootstrap.servers' = '127.0.0.1:9092', \ >> 'sink.partitioner' = 'round-robin', \ >> 'format' = 'json') >> >> -- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload >> INSERT INTO output >> SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username, >> update_time) as payload >> FROM ... >> >> >> Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到 >> mysql 不是很方便么? >> >> Best, >> Jark >> >> >> On Mon, 27 Jul 2020 at 17:33, RS <[hidden email]> wrote: >> >>> hi, >>> kafka->Flink->kafka->mysql >>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 >>> 使用kafka-connect是方便数据同时导出到其他存储 >>> >>> >>> >>> Flink定义输出表结构: >>> >>> CREATE TABLE print_table \ >>> >>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \ >>> >>> WITH (\ >>> >>> 'connector' = 'kafka', \ >>> >>> 'topic' = 'test_out', \ >>> >>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \ >>> >>> 'sink.partitioner' = 'round-robin', \ >>> >>> 'format' = 'json') >>> >>> >>> >>> >>> 输出的数据格式示例: >>> >>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"} >>> >>> >>> >>> >>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例: >>> >>> { >>> >>> "schema": { >>> >>> "type": "struct", >>> >>> "fields": [ >>> >>> { >>> >>> "type": "int64", >>> >>> "optional": false, >>> >>> "field": "id" >>> >>> }, >>> >>> { >>> >>> "type": "string", >>> >>> "optional": true, >>> >>> "field": "name" >>> >>> } >>> >>> ], >>> >>> "optional": true, >>> >>> "name": "user" >>> >>> }, >>> >>> "payload": { >>> >>> "id": 1, >>> >>> "name": "admin" >>> >>> } >>> >>> } >>> >>> >>> >>> >>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式? >>> >>> 当前Flink处理sql: >>> >>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS >>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as >>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' >>> MINUTE) >>> >>> >>> |
In reply to this post by Jark
Hi,
啊,发现不太对,`schema`需要一个dict,不是STRING。请教下这个如何用SQL定义出来? Thanks 在 2020-07-27 17:49:18,"Jark Wu" <[hidden email]> 写道: >Hi, > >你需要在 DDL 和 query 上都补上 schema 和 payload: > >CREATE TABLE print_table \ >(`schema` STRING, `payload` ROW<total_count BIGINT, username STRING, >update_time TIMESTAMP(6)>) \ >WITH (\ >'connector' = 'kafka', \ >'topic' = 'test_out', \ >'properties.bootstrap.servers' = '127.0.0.1:9092', \ >'sink.partitioner' = 'round-robin', \ >'format' = 'json') > >-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload >INSERT INTO output >SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username, >update_time) as payload >FROM ... > > >Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到 >mysql 不是很方便么? > >Best, >Jark > > >On Mon, 27 Jul 2020 at 17:33, RS <[hidden email]> wrote: > >> hi, >> kafka->Flink->kafka->mysql >> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 >> 使用kafka-connect是方便数据同时导出到其他存储 >> >> >> >> Flink定义输出表结构: >> >> CREATE TABLE print_table \ >> >> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \ >> >> WITH (\ >> >> 'connector' = 'kafka', \ >> >> 'topic' = 'test_out', \ >> >> 'properties.bootstrap.servers' = '127.0.0.1:9092', \ >> >> 'sink.partitioner' = 'round-robin', \ >> >> 'format' = 'json') >> >> >> >> >> 输出的数据格式示例: >> >> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"} >> >> >> >> >> 但是kafka-connect-jdbc的json格式需要schema和payload,示例: >> >> { >> >> "schema": { >> >> "type": "struct", >> >> "fields": [ >> >> { >> >> "type": "int64", >> >> "optional": false, >> >> "field": "id" >> >> }, >> >> { >> >> "type": "string", >> >> "optional": true, >> >> "field": "name" >> >> } >> >> ], >> >> "optional": true, >> >> "name": "user" >> >> }, >> >> "payload": { >> >> "id": 1, >> >> "name": "admin" >> >> } >> >> } >> >> >> >> >> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式? >> >> 当前Flink处理sql: >> >> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS >> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as >> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' >> MINUTE) >> >> >> |
Free forum by Nabble | Edit this page |