目标是用flink作业实现类似canal server的功能
CREATE TABLE `binlog_table` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '${mysql.hostname}', 'port' = '3306', 'username' = '${mysql.username}', 'password' = '${mysql.password}', 'database-name' = '${mysql.database}', 'table-name' = '${mysql.table}' ); CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = '${bootstrap.servers}', 'format' = 'canal-json' ); INSERT INTO `kafka_sink` (SELECT * FROM `binlog_table`); 出来的结果是这样: { "data": [ { "id": 3, "name": "自动付款接口BuyETC金额", "sys_id": "0184", "sequence": 2, "filter": "(a=1)", "tag": "MerId(商户号)", "remark": "O", "create_date": "2020-11-02 15:01:31", "update_date": "2021-04-07 09:23:59", "reserve": "", "sys_name": "NHL", "metric_seq": 0, "advanced_function": "", "value_type": "sum", "value_field": "value", "status": 1, "syn_date": "2021-01-28 19:31:36", "confirmer": null, "confirm_time": null, "index_explain": "aa", "field_name": null, "tag_values": null } ], "type": "INSERT" } 并不是标准的canal json格式。改用upsert-kafka connector试了也不行 CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' = 'json', 'value.format' = 'json' ); 出来的数据长这样 {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06 12:27:30","update_date":"2021-04-06 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null} |
Hi casel.
flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。 https://github.com/ververica/flink-cdc-connectors/blob/master/README.md casel.chen <[hidden email]> 于2021年4月20日周二 下午6:18写道: > 目标是用flink作业实现类似canal server的功能 > > > CREATE TABLE `binlog_table` ( > > `id` INT, > > `name` STRING, > > `sys_id` STRING, > > `sequence` INT, > > `filter` STRING, > > `tag` STRING, > > `remark` STRING, > > `create_date` TIMESTAMP, > > `update_date` TIMESTAMP, > > `reserve` STRING, > > `sys_name` STRING, > > `metric_seq` INT, > > `advanced_function` STRING, > > `value_type` STRING, > > `value_field` STRING, > > `status` INT, > > `syn_date` TIMESTAMP, > > `confirmer` STRING, > > `confirm_time` TIMESTAMP, > > `index_explain` STRING, > > `field_name` STRING, > > `tag_values` STRING, > > PRIMARY KEY (`id`) NOT ENFORCED > > ) WITH ( > > 'connector' = 'mysql-cdc', > > 'hostname' = '${mysql.hostname}', > > 'port' = '3306', > > 'username' = '${mysql.username}', > > 'password' = '${mysql.password}', > > 'database-name' = '${mysql.database}', > > 'table-name' = '${mysql.table}' > > ); > > > > > CREATE TABLE `kafka_sink` ( > > `id` INT, > > `name` STRING, > > `sys_id` STRING, > > `sequence` INT, > > `filter` STRING, > > `tag` STRING, > > `remark` STRING, > > `create_date` TIMESTAMP, > > `update_date` TIMESTAMP, > > `reserve` STRING, > > `sys_name` STRING, > > `metric_seq` INT, > > `advanced_function` STRING, > > `value_type` STRING, > > `value_field` STRING, > > `status` INT, > > `syn_date` TIMESTAMP, > > `confirmer` STRING, > > `confirm_time` TIMESTAMP, > > `index_explain` STRING, > > `field_name` STRING, > > `tag_values` STRING, > > PRIMARY KEY (`id`) NOT ENFORCED > > ) WITH ( > > 'connector' = 'kafka', > > 'topic' = '${topic}', > > 'properties.bootstrap.servers' = '${bootstrap.servers}', > > 'format' = 'canal-json' > > ); > > > > > INSERT INTO `kafka_sink` > > (SELECT * > > FROM `binlog_table`); > > 出来的结果是这样: > > > { > "data": [ > { > "id": 3, > "name": "自动付款接口BuyETC金额", > "sys_id": "0184", > "sequence": 2, > "filter": "(a=1)", > "tag": "MerId(商户号)", > "remark": "O", > "create_date": "2020-11-02 15:01:31", > "update_date": "2021-04-07 09:23:59", > "reserve": "", > "sys_name": "NHL", > "metric_seq": 0, > "advanced_function": "", > "value_type": "sum", > "value_field": "value", > "status": 1, > "syn_date": "2021-01-28 19:31:36", > "confirmer": null, > "confirm_time": null, > "index_explain": "aa", > "field_name": null, > "tag_values": null > } > ], > "type": "INSERT" > } > 并不是标准的canal json格式。改用upsert-kafka connector试了也不行 > > > > CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, > `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, > `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, > `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, > `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` > TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` > STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT > ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}', > 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' = > 'json', > 'value.format' = 'json' ); > > > 出来的数据长这样 > > > > {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06 > 12:27:30","update_date":"2021-04-06 > 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07 > 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null} > > > |
真的话,就不用flink-cdc,你直接使用kafka,然后format 采用canal-json就好了。
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年4月21日(星期三) 下午2:16 收件人: "user-zh"<[hidden email]>; 主题: Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗? Hi casel. flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。 https://github.com/ververica/flink-cdc-connectors/blob/master/README.md casel.chen <[hidden email]> 于2021年4月20日周二 下午6:18写道: > 目标是用flink作业实现类似canal server的功能 > > > CREATE TABLE `binlog_table` ( > > `id` INT, > > `name` STRING, > > `sys_id` STRING, > > `sequence` INT, > > `filter` STRING, > > `tag` STRING, > > `remark` STRING, > > `create_date` TIMESTAMP, > > `update_date` TIMESTAMP, > > `reserve` STRING, > > `sys_name` STRING, > > `metric_seq` INT, > > `advanced_function` STRING, > > `value_type` STRING, > > `value_field` STRING, > > `status` INT, > > `syn_date` TIMESTAMP, > > `confirmer` STRING, > > `confirm_time` TIMESTAMP, > > `index_explain` STRING, > > `field_name` STRING, > > `tag_values` STRING, > > PRIMARY KEY (`id`) NOT ENFORCED > > ) WITH ( > > 'connector' = 'mysql-cdc', > > 'hostname' = '${mysql.hostname}', > > 'port' = '3306', > > 'username' = '${mysql.username}', > > 'password' = '${mysql.password}', > > 'database-name' = '${mysql.database}', > > 'table-name' = '${mysql.table}' > > ); > > > > > CREATE TABLE `kafka_sink` ( > > `id` INT, > > `name` STRING, > > `sys_id` STRING, > > `sequence` INT, > > `filter` STRING, > > `tag` STRING, > > `remark` STRING, > > `create_date` TIMESTAMP, > > `update_date` TIMESTAMP, > > `reserve` STRING, > > `sys_name` STRING, > > `metric_seq` INT, > > `advanced_function` STRING, > > `value_type` STRING, > > `value_field` STRING, > > `status` INT, > > `syn_date` TIMESTAMP, > > `confirmer` STRING, > > `confirm_time` TIMESTAMP, > > `index_explain` STRING, > > `field_name` STRING, > > `tag_values` STRING, > > PRIMARY KEY (`id`) NOT ENFORCED > > ) WITH ( > > 'connector' = 'kafka', > > 'topic' = '${topic}', > > 'properties.bootstrap.servers' = '${bootstrap.servers}', > > 'format' = 'canal-json' > > ); > > > > > INSERT INTO `kafka_sink` > > (SELECT * > > FROM `binlog_table`); > > 出来的结果是这样: > > > { > "data": [ > { > "id": 3, > "name": "自动付款接口BuyETC金额", > "sys_id": "0184", > "sequence": 2, > "filter": "(a=1)", > "tag": "MerId(商户号)", > "remark": "O", > "create_date": "2020-11-02 15:01:31", > "update_date": "2021-04-07 09:23:59", > "reserve": "", > "sys_name": "NHL", > "metric_seq": 0, > "advanced_function": "", > "value_type": "sum", > "value_field": "value", > "status": 1, > "syn_date": "2021-01-28 19:31:36", > "confirmer": null, > "confirm_time": null, > "index_explain": "aa", > "field_name": null, > "tag_values": null > } > ], > "type": "INSERT" > } > 并不是标准的canal json格式。改用upsert-kafka connector试了也不行 > > > > CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, > `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, > `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, > `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, > `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` > TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` > STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT > ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}', > 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' = > 'json', > 'value.format' = 'json' ); > > > 出来的数据长这样 > > > > {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06 > 12:27:30","update_date":"2021-04-06 > 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07 > 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null} > > > |
我看了源码,即使改换成debezium json格式输出,也得不到原本debezium json数据,因为输出字段只有有限的3个,没有关键的库表信息。而且看了其他几个cdc格式,都有类似的问题
想知道是为什么?追踪到上游debezium emitRecords方法,参数record就只有rowdata和rowkind信息,没有table和database DebeziumJsonSerializationSchema.java private static RowType createJsonRowType(DataType databaseSchema) { // Debezium JSON contains some other information, e.g. "source", "ts_ms" // but we don't need them. return (RowType) DataTypes.ROW( DataTypes.FIELD("before", databaseSchema), DataTypes.FIELD("after", databaseSchema), DataTypes.FIELD("op", DataTypes.STRING())) .getLogicalType(); } 在 2021-04-21 14:18:42,"飞翔" <[hidden email]> 写道: >真的话,就不用flink-cdc,你直接使用kafka,然后format 采用canal-json就好了。 > > >------------------ 原始邮件 ------------------ >发件人: "user-zh" <[hidden email]>; >发送时间: 2021年4月21日(星期三) 下午2:16 >收件人: "user-zh"<[hidden email]>; > >主题: Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗? > > > >Hi casel. >flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。 > >https://github.com/ververica/flink-cdc-connectors/blob/master/README.md > >casel.chen <[hidden email]> 于2021年4月20日周二 下午6:18写道: > >> 目标是用flink作业实现类似canal server的功能 >> >> >> CREATE TABLE `binlog_table` ( >> >> `id` INT, >> >> `name` STRING, >> >> `sys_id` STRING, >> >> `sequence` INT, >> >> `filter` STRING, >> >> `tag` STRING, >> >> `remark` STRING, >> >> `create_date` TIMESTAMP, >> >> `update_date` TIMESTAMP, >> >> `reserve` STRING, >> >> `sys_name` STRING, >> >> `metric_seq` INT, >> >> `advanced_function` STRING, >> >> `value_type` STRING, >> >> `value_field` STRING, >> >> `status` INT, >> >> `syn_date` TIMESTAMP, >> >> `confirmer` STRING, >> >> `confirm_time` TIMESTAMP, >> >> `index_explain` STRING, >> >> `field_name` STRING, >> >> `tag_values` STRING, >> >> PRIMARY KEY (`id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'mysql-cdc', >> >> 'hostname' = '${mysql.hostname}', >> >> 'port' = '3306', >> >> 'username' = '${mysql.username}', >> >> 'password' = '${mysql.password}', >> >> 'database-name' = '${mysql.database}', >> >> 'table-name' = '${mysql.table}' >> >> ); >> >> >> >> >> CREATE TABLE `kafka_sink` ( >> >> `id` INT, >> >> `name` STRING, >> >> `sys_id` STRING, >> >> `sequence` INT, >> >> `filter` STRING, >> >> `tag` STRING, >> >> `remark` STRING, >> >> `create_date` TIMESTAMP, >> >> `update_date` TIMESTAMP, >> >> `reserve` STRING, >> >> `sys_name` STRING, >> >> `metric_seq` INT, >> >> `advanced_function` STRING, >> >> `value_type` STRING, >> >> `value_field` STRING, >> >> `status` INT, >> >> `syn_date` TIMESTAMP, >> >> `confirmer` STRING, >> >> `confirm_time` TIMESTAMP, >> >> `index_explain` STRING, >> >> `field_name` STRING, >> >> `tag_values` STRING, >> >> PRIMARY KEY (`id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'kafka', >> >> 'topic' = '${topic}', >> >> 'properties.bootstrap.servers' = '${bootstrap.servers}', >> >> 'format' = 'canal-json' >> >> ); >> >> >> >> >> INSERT INTO `kafka_sink` >> >> (SELECT * >> >> FROM `binlog_table`); >> >> 出来的结果是这样: >> >> >> { >> "data": [ >> { >> "id": 3, >> "name": "自动付款接口BuyETC金额", >> "sys_id": "0184", >> "sequence": 2, >> "filter": "(a=1)", >> "tag": "MerId(商户号)", >> "remark": "O", >> "create_date": "2020-11-02 15:01:31", >> "update_date": "2021-04-07 09:23:59", >> "reserve": "", >> "sys_name": "NHL", >> "metric_seq": 0, >> "advanced_function": "", >> "value_type": "sum", >> "value_field": "value", >> "status": 1, >> "syn_date": "2021-01-28 19:31:36", >> "confirmer": null, >> "confirm_time": null, >> "index_explain": "aa", >> "field_name": null, >> "tag_values": null >> } >> ], >> "type": "INSERT" >> } >> 并不是标准的canal json格式。改用upsert-kafka connector试了也不行 >> >> >> >> CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, >> `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, >> `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, >> `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, >> `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` >> TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` >> STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT >> ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}', >> 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' = >> 'json', >> 'value.format' = 'json' ); >> >> >> 出来的数据长这样 >> >> >> >> {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06 >> 12:27:30","update_date":"2021-04-06 >> 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07 >> 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null} >> >> >> |
Free forum by Nabble | Edit this page |