flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

casel.chen
目标是用flink作业实现类似canal server的功能


CREATE TABLE `binlog_table` (

                                `id` INT,

                                `name` STRING,

                                `sys_id` STRING,

                                `sequence` INT,

                                `filter` STRING,

                                `tag` STRING,

                                `remark` STRING,

                                `create_date` TIMESTAMP,

                                `update_date` TIMESTAMP,

                                `reserve` STRING,

                                `sys_name` STRING,

                                `metric_seq` INT,

                                `advanced_function` STRING,

                                `value_type` STRING,

                                `value_field` STRING,

                                `status` INT,

                                `syn_date` TIMESTAMP,

                                `confirmer` STRING,

                                `confirm_time` TIMESTAMP,

                                `index_explain` STRING,

                                `field_name` STRING,

                                `tag_values` STRING,

                                PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

      'connector' = 'mysql-cdc',

      'hostname' = '${mysql.hostname}',

      'port' = '3306',

      'username' = '${mysql.username}',

      'password' = '${mysql.password}',

      'database-name' = '${mysql.database}',

      'table-name' = '${mysql.table}'

      );




CREATE TABLE `kafka_sink` (

                              `id` INT,

                              `name` STRING,

                              `sys_id` STRING,

                              `sequence` INT,

                              `filter` STRING,

                              `tag` STRING,

                              `remark` STRING,

                              `create_date` TIMESTAMP,

                              `update_date` TIMESTAMP,

                              `reserve` STRING,

                              `sys_name` STRING,

                              `metric_seq` INT,

                              `advanced_function` STRING,

                              `value_type` STRING,

                              `value_field` STRING,

                              `status` INT,

                              `syn_date` TIMESTAMP,

                              `confirmer` STRING,

                              `confirm_time` TIMESTAMP,

                              `index_explain` STRING,

                              `field_name` STRING,

                              `tag_values` STRING,

                              PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

      'connector' = 'kafka',

      'topic' = '${topic}',

      'properties.bootstrap.servers' = '${bootstrap.servers}',

      'format' = 'canal-json'

      );




INSERT INTO `kafka_sink`

    (SELECT *

     FROM `binlog_table`);

出来的结果是这样:


{
"data": [
{
"id": 3,
"name": "自动付款接口BuyETC金额",
"sys_id": "0184",
"sequence": 2,
"filter": "(a=1)",
"tag": "MerId(商户号)",
"remark": "O",
"create_date": "2020-11-02 15:01:31",
"update_date": "2021-04-07 09:23:59",
"reserve": "",
"sys_name": "NHL",
"metric_seq": 0,
"advanced_function": "",
"value_type": "sum",
"value_field": "value",
"status": 1,
"syn_date": "2021-01-28 19:31:36",
"confirmer": null,
"confirm_time": null,
"index_explain": "aa",
"field_name": null,
"tag_values": null
}
],
"type": "INSERT"
}
并不是标准的canal json格式。改用upsert-kafka connector试了也不行



CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' = 'json',
'value.format' = 'json' );


出来的数据长这样



{"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06 12:27:30","update_date":"2021-04-06 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null}


Reply | Threaded
Open this post in threaded view
|

Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

Qishang Zhong
Hi casel.
flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。

https://github.com/ververica/flink-cdc-connectors/blob/master/README.md

casel.chen <[hidden email]> 于2021年4月20日周二 下午6:18写道:

> 目标是用flink作业实现类似canal server的功能
>
>
> CREATE TABLE `binlog_table` (
>
>                                 `id` INT,
>
>                                 `name` STRING,
>
>                                 `sys_id` STRING,
>
>                                 `sequence` INT,
>
>                                 `filter` STRING,
>
>                                 `tag` STRING,
>
>                                 `remark` STRING,
>
>                                 `create_date` TIMESTAMP,
>
>                                 `update_date` TIMESTAMP,
>
>                                 `reserve` STRING,
>
>                                 `sys_name` STRING,
>
>                                 `metric_seq` INT,
>
>                                 `advanced_function` STRING,
>
>                                 `value_type` STRING,
>
>                                 `value_field` STRING,
>
>                                 `status` INT,
>
>                                 `syn_date` TIMESTAMP,
>
>                                 `confirmer` STRING,
>
>                                 `confirm_time` TIMESTAMP,
>
>                                 `index_explain` STRING,
>
>                                 `field_name` STRING,
>
>                                 `tag_values` STRING,
>
>                                 PRIMARY KEY (`id`) NOT ENFORCED
>
> ) WITH (
>
>       'connector' = 'mysql-cdc',
>
>       'hostname' = '${mysql.hostname}',
>
>       'port' = '3306',
>
>       'username' = '${mysql.username}',
>
>       'password' = '${mysql.password}',
>
>       'database-name' = '${mysql.database}',
>
>       'table-name' = '${mysql.table}'
>
>       );
>
>
>
>
> CREATE TABLE `kafka_sink` (
>
>                               `id` INT,
>
>                               `name` STRING,
>
>                               `sys_id` STRING,
>
>                               `sequence` INT,
>
>                               `filter` STRING,
>
>                               `tag` STRING,
>
>                               `remark` STRING,
>
>                               `create_date` TIMESTAMP,
>
>                               `update_date` TIMESTAMP,
>
>                               `reserve` STRING,
>
>                               `sys_name` STRING,
>
>                               `metric_seq` INT,
>
>                               `advanced_function` STRING,
>
>                               `value_type` STRING,
>
>                               `value_field` STRING,
>
>                               `status` INT,
>
>                               `syn_date` TIMESTAMP,
>
>                               `confirmer` STRING,
>
>                               `confirm_time` TIMESTAMP,
>
>                               `index_explain` STRING,
>
>                               `field_name` STRING,
>
>                               `tag_values` STRING,
>
>                               PRIMARY KEY (`id`) NOT ENFORCED
>
> ) WITH (
>
>       'connector' = 'kafka',
>
>       'topic' = '${topic}',
>
>       'properties.bootstrap.servers' = '${bootstrap.servers}',
>
>       'format' = 'canal-json'
>
>       );
>
>
>
>
> INSERT INTO `kafka_sink`
>
>     (SELECT *
>
>      FROM `binlog_table`);
>
> 出来的结果是这样:
>
>
> {
> "data": [
> {
> "id": 3,
> "name": "自动付款接口BuyETC金额",
> "sys_id": "0184",
> "sequence": 2,
> "filter": "(a=1)",
> "tag": "MerId(商户号)",
> "remark": "O",
> "create_date": "2020-11-02 15:01:31",
> "update_date": "2021-04-07 09:23:59",
> "reserve": "",
> "sys_name": "NHL",
> "metric_seq": 0,
> "advanced_function": "",
> "value_type": "sum",
> "value_field": "value",
> "status": 1,
> "syn_date": "2021-01-28 19:31:36",
> "confirmer": null,
> "confirm_time": null,
> "index_explain": "aa",
> "field_name": null,
> "tag_values": null
> }
> ],
> "type": "INSERT"
> }
> 并不是标准的canal json格式。改用upsert-kafka connector试了也不行
>
>
>
> CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING,
> `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING,
> `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING,
> `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING,
> `value_type` STRING, `value_field` STRING, `status` INT, `syn_date`
> TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain`
> STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT
> ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}',
> 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' =
> 'json',
> 'value.format' = 'json' );
>
>
> 出来的数据长这样
>
>
>
> {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06
> 12:27:30","update_date":"2021-04-06
> 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07
> 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null}
>
>
>
Reply | Threaded
Open this post in threaded view
|

回复: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

飞翔
真的话,就不用flink-cdc,你直接使用kafka,然后format 采用canal-json就好了。


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年4月21日(星期三) 下午2:16
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?



Hi casel.
flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。

https://github.com/ververica/flink-cdc-connectors/blob/master/README.md

casel.chen <[hidden email]&gt; 于2021年4月20日周二 下午6:18写道:

&gt; 目标是用flink作业实现类似canal server的功能
&gt;
&gt;
&gt; CREATE TABLE `binlog_table` (
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `id` INT,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `name` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sys_id` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sequence` INT,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `filter` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `tag` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `remark` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `create_date` TIMESTAMP,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `update_date` TIMESTAMP,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `reserve` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sys_name` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `metric_seq` INT,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `advanced_function` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `value_type` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `value_field` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `status` INT,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `syn_date` TIMESTAMP,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `confirmer` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `confirm_time` TIMESTAMP,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `index_explain` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `field_name` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `tag_values` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PRIMARY KEY (`id`) NOT ENFORCED
&gt;
&gt; ) WITH (
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'mysql-cdc',
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'hostname' = '${mysql.hostname}',
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'port' = '3306',
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'username' = '${mysql.username}',
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'password' = '${mysql.password}',
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'database-name' = '${mysql.database}',
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'table-name' = '${mysql.table}'
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; );
&gt;
&gt;
&gt;
&gt;
&gt; CREATE TABLE `kafka_sink` (
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `id` INT,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `name` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sys_id` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sequence` INT,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `filter` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `tag` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `remark` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `create_date` TIMESTAMP,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `update_date` TIMESTAMP,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `reserve` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sys_name` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `metric_seq` INT,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `advanced_function` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `value_type` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `value_field` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `status` INT,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `syn_date` TIMESTAMP,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `confirmer` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `confirm_time` TIMESTAMP,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `index_explain` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `field_name` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `tag_values` STRING,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PRIMARY KEY (`id`) NOT ENFORCED
&gt;
&gt; ) WITH (
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'kafka',
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'topic' = '${topic}',
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'properties.bootstrap.servers' = '${bootstrap.servers}',
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format' = 'canal-json'
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; );
&gt;
&gt;
&gt;
&gt;
&gt; INSERT INTO `kafka_sink`
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; (SELECT *
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; FROM `binlog_table`);
&gt;
&gt; 出来的结果是这样:
&gt;
&gt;
&gt; {
&gt; "data": [
&gt; {
&gt; "id": 3,
&gt; "name": "自动付款接口BuyETC金额",
&gt; "sys_id": "0184",
&gt; "sequence": 2,
&gt; "filter": "(a=1)",
&gt; "tag": "MerId(商户号)",
&gt; "remark": "O",
&gt; "create_date": "2020-11-02 15:01:31",
&gt; "update_date": "2021-04-07 09:23:59",
&gt; "reserve": "",
&gt; "sys_name": "NHL",
&gt; "metric_seq": 0,
&gt; "advanced_function": "",
&gt; "value_type": "sum",
&gt; "value_field": "value",
&gt; "status": 1,
&gt; "syn_date": "2021-01-28 19:31:36",
&gt; "confirmer": null,
&gt; "confirm_time": null,
&gt; "index_explain": "aa",
&gt; "field_name": null,
&gt; "tag_values": null
&gt; }
&gt; ],
&gt; "type": "INSERT"
&gt; }
&gt; 并不是标准的canal json格式。改用upsert-kafka connector试了也不行
&gt;
&gt;
&gt;
&gt; CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING,
&gt; `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING,
&gt; `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING,
&gt; `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING,
&gt; `value_type` STRING, `value_field` STRING, `status` INT, `syn_date`
&gt; TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain`
&gt; STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT
&gt; ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}',
&gt; 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' =
&gt; 'json',
&gt; 'value.format' = 'json' );
&gt;
&gt;
&gt; 出来的数据长这样
&gt;
&gt;
&gt;
&gt; {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06
&gt; 12:27:30","update_date":"2021-04-06
&gt; 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07
&gt; 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null}
&gt;
&gt;
&gt;
Reply | Threaded
Open this post in threaded view
|

Re:回复: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

casel.chen
我看了源码,即使改换成debezium json格式输出,也得不到原本debezium json数据,因为输出字段只有有限的3个,没有关键的库表信息。而且看了其他几个cdc格式,都有类似的问题
想知道是为什么?追踪到上游debezium emitRecords方法,参数record就只有rowdata和rowkind信息,没有table和database


DebeziumJsonSerializationSchema.java


private static RowType createJsonRowType(DataType databaseSchema) {
        // Debezium JSON contains some other information, e.g. "source", "ts_ms"
        // but we don't need them.
        return (RowType)
                DataTypes.ROW(
                                DataTypes.FIELD("before", databaseSchema),
                                DataTypes.FIELD("after", databaseSchema),
                                DataTypes.FIELD("op", DataTypes.STRING()))
                        .getLogicalType();
    }

















在 2021-04-21 14:18:42,"飞翔" <[hidden email]> 写道:

>真的话,就不用flink-cdc,你直接使用kafka,然后format 采用canal-json就好了。
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
>发送时间:&nbsp;2021年4月21日(星期三) 下午2:16
>收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
>主题:&nbsp;Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?
>
>
>
>Hi casel.
>flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。
>
>https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
>
>casel.chen <[hidden email]&gt; 于2021年4月20日周二 下午6:18写道:
>
>&gt; 目标是用flink作业实现类似canal server的功能
>&gt;
>&gt;
>&gt; CREATE TABLE `binlog_table` (
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `id` INT,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `name` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sys_id` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sequence` INT,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `filter` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `tag` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `remark` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `create_date` TIMESTAMP,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `update_date` TIMESTAMP,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `reserve` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sys_name` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `metric_seq` INT,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `advanced_function` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `value_type` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `value_field` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `status` INT,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `syn_date` TIMESTAMP,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `confirmer` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `confirm_time` TIMESTAMP,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `index_explain` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `field_name` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `tag_values` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PRIMARY KEY (`id`) NOT ENFORCED
>&gt;
>&gt; ) WITH (
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'mysql-cdc',
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'hostname' = '${mysql.hostname}',
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'port' = '3306',
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'username' = '${mysql.username}',
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'password' = '${mysql.password}',
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'database-name' = '${mysql.database}',
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'table-name' = '${mysql.table}'
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; );
>&gt;
>&gt;
>&gt;
>&gt;
>&gt; CREATE TABLE `kafka_sink` (
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `id` INT,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `name` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sys_id` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sequence` INT,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `filter` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `tag` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `remark` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `create_date` TIMESTAMP,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `update_date` TIMESTAMP,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `reserve` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `sys_name` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `metric_seq` INT,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `advanced_function` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `value_type` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `value_field` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `status` INT,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `syn_date` TIMESTAMP,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `confirmer` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `confirm_time` TIMESTAMP,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `index_explain` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `field_name` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `tag_values` STRING,
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PRIMARY KEY (`id`) NOT ENFORCED
>&gt;
>&gt; ) WITH (
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'kafka',
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'topic' = '${topic}',
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'properties.bootstrap.servers' = '${bootstrap.servers}',
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format' = 'canal-json'
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; );
>&gt;
>&gt;
>&gt;
>&gt;
>&gt; INSERT INTO `kafka_sink`
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp; (SELECT *
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; FROM `binlog_table`);
>&gt;
>&gt; 出来的结果是这样:
>&gt;
>&gt;
>&gt; {
>&gt; "data": [
>&gt; {
>&gt; "id": 3,
>&gt; "name": "自动付款接口BuyETC金额",
>&gt; "sys_id": "0184",
>&gt; "sequence": 2,
>&gt; "filter": "(a=1)",
>&gt; "tag": "MerId(商户号)",
>&gt; "remark": "O",
>&gt; "create_date": "2020-11-02 15:01:31",
>&gt; "update_date": "2021-04-07 09:23:59",
>&gt; "reserve": "",
>&gt; "sys_name": "NHL",
>&gt; "metric_seq": 0,
>&gt; "advanced_function": "",
>&gt; "value_type": "sum",
>&gt; "value_field": "value",
>&gt; "status": 1,
>&gt; "syn_date": "2021-01-28 19:31:36",
>&gt; "confirmer": null,
>&gt; "confirm_time": null,
>&gt; "index_explain": "aa",
>&gt; "field_name": null,
>&gt; "tag_values": null
>&gt; }
>&gt; ],
>&gt; "type": "INSERT"
>&gt; }
>&gt; 并不是标准的canal json格式。改用upsert-kafka connector试了也不行
>&gt;
>&gt;
>&gt;
>&gt; CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING,
>&gt; `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING,
>&gt; `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING,
>&gt; `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING,
>&gt; `value_type` STRING, `value_field` STRING, `status` INT, `syn_date`
>&gt; TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain`
>&gt; STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT
>&gt; ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}',
>&gt; 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' =
>&gt; 'json',
>&gt; 'value.format' = 'json' );
>&gt;
>&gt;
>&gt; 出来的数据长这样
>&gt;
>&gt;
>&gt;
>&gt; {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06
>&gt; 12:27:30","update_date":"2021-04-06
>&gt; 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07
>&gt; 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null}
>&gt;
>&gt;
>&gt;