最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?
CREATE TABLE `binlog_table` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '${mysql.hostname}', 'port' = '3306', 'username' = '${mysql.username}', 'password' = '${mysql.password}', 'database-name' = '${mysql.database}', 'table-name' = '${mysql.table}' ); CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING ) WITH ( 'connector' = 'kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = '${bootstrap.servers}', 'format' = 'canal-json' ); INSERT INTO `kafka_sink` (SELECT * FROM `binlog_table`); |
既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka, 比如canal的样例,虽然after 不是很全,你可以自己去构造补全,这样你采用debezium不就好了,也就是flink-cdc为什么集成debezium的原因,更新前后都是一个完整的record ------------------ 原始邮件 ------------------ CREATE TABLE `binlog_table` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '${mysql.hostname}', 'port' = '3306', 'username' = '${mysql.username}', 'password' = '${mysql.password}', 'database-name' = '${mysql.database}', 'table-name' = '${mysql.table}' ); CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING ) WITH ( 'connector' = 'kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = '${bootstrap.servers}', 'format' = 'canal-json' ); INSERT INTO `kafka_sink` (SELECT * FROM `binlog_table`); |
我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。 在 2021-04-22 11:01:22,"飞翔" <[hidden email]> 写道: 既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka, 比如canal的样例,虽然after 不是很全,你可以自己去构造补全,这样你采用debezium不就好了,也就是flink-cdc为什么集成debezium的原因,更新前后都是一个完整的record ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年4月22日(星期四) 上午9:41 收件人: "[hidden email]"<[hidden email]>; 主题: flink sql cdc发到kafka消息表名信息缺失问题 最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢? CREATE TABLE `binlog_table` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '${mysql.hostname}', 'port' = '3306', 'username' = '${mysql.username}', 'password' = '${mysql.password}', 'database-name' = '${mysql.database}', 'table-name' = '${mysql.table}' ); CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING ) WITH ( 'connector' = 'kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = '${bootstrap.servers}', 'format' = 'canal-json' ); INSERT INTO `kafka_sink` (SELECT * FROM `binlog_table`); |
首先第一点:flink-cdc可以拿到SourceRecord,SourceRecord里面有topic名称。 第二:追溯Debezium 里面关于mysql-conector的对接方式,还是基于kafka-connector实现的,那么他的topic是自动生成的,生成规则,参考官网: 也就是 根据服务名+数据库+表,那么我们可以根据topic去反解析拿到库表。 当然需要本地调试下,看下SourceRecord里面的topic是否是这样。 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年4月22日(星期四) 下午2:32 收件人: "user-zh"<[hidden email]>; 主题: Re:回复:flink sql cdc发到kafka消息表名信息缺失问题 我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。 在 2021-04-22 11:01:22,"飞翔" <[hidden email]> 写道: 既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka, 比如canal的样例,虽然after 不是很全,你可以自己去构造补全,这样你采用debezium不就好了,也就是flink-cdc为什么集成debezium的原因,更新前后都是一个完整的record ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年4月22日(星期四) 上午9:41 收件人: "[hidden email]"<[hidden email]>; 主题: flink sql cdc发到kafka消息表名信息缺失问题 最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢? CREATE TABLE `binlog_table` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '${mysql.hostname}', 'port' = '3306', 'username' = '${mysql.username}', 'password' = '${mysql.password}', 'database-name' = '${mysql.database}', 'table-name' = '${mysql.table}' ); CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING ) WITH ( 'connector' = 'kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = '${bootstrap.servers}', 'format' = 'canal-json' ); INSERT INTO `kafka_sink` (SELECT * FROM `binlog_table`); |
Free forum by Nabble | Edit this page |