flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

jindy_liu
例如:

mysql表:
CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  `time` datetime NOT NULL,
  `status` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `status` (
  `id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

kafka中数据:
// 表test 中insert事件
{"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}

//表status 中的事件
{"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}

如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。






--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

godfrey he
http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
 这个邮件里提到了类似的问题。

https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将
“data”,“mysqlType”等格式不确定的字段定义为String类型,
下游通过udf自己再解析对应的json


Best,
Godfrey

jindy_liu <[hidden email]> 于2020年7月21日周二 下午12:37写道:

> 例如:
>
> mysql表:
> CREATE TABLE `test` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   `time` datetime NOT NULL,
>   `status` int(11) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> CREATE TABLE `status` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> kafka中数据:
> // 表test 中insert事件
> {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
>
> 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}
>
> //表status 中的事件
>
> {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}
>
> 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
> 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

Jark
Administrator
Hi,

目前 Flink SQL CDC 是不支持自动感知新表的,得要提前定义要表的 schema 然后提交同步作业。比如你上面的例子,就需要定义两个
source 表:

CREATE TABLE `test` (
  `id` int,
  `name` string,
  `time` timestamp(3),
  `status` int
) with (
  'connector' = 'kafka',
  'format' = 'canal-json',
  ...
);

insert into downstream1 select * from `test`;


CREATE TABLE `status` (
  `id` int
  `name` string
) with (
  'connector' = 'kafka',
  'format' = 'canal-json',
  ...
);

insert into downstream2 select * from `status`;


Best,
Jark





On Tue, 21 Jul 2020 at 15:19, godfrey he <[hidden email]> wrote:

>
> http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
>  这个邮件里提到了类似的问题。
>
> https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将
> “data”,“mysqlType”等格式不确定的字段定义为String类型,
> 下游通过udf自己再解析对应的json
>
>
> Best,
> Godfrey
>
> jindy_liu <[hidden email]> 于2020年7月21日周二 下午12:37写道:
>
> > 例如:
> >
> > mysql表:
> > CREATE TABLE `test` (
> >   `id` int(11) NOT NULL,
> >   `name` varchar(255) NOT NULL,
> >   `time` datetime NOT NULL,
> >   `status` int(11) NOT NULL,
> >   PRIMARY KEY (`id`)
> > ) ENGINE=InnoDB DEFAULT CHARSET=utf8
> >
> > CREATE TABLE `status` (
> >   `id` int(11) NOT NULL,
> >   `name` varchar(255) NOT NULL,
> >   PRIMARY KEY (`id`)
> > ) ENGINE=InnoDB DEFAULT CHARSET=utf8
> >
> > kafka中数据:
> > // 表test 中insert事件
> > {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
> >
> >
> 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}
> >
> > //表status 中的事件
> >
> >
> {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}
> >
> > 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
> > 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。
> >
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>