例如:
mysql表: CREATE TABLE `test` ( `id` int(11) NOT NULL, `name` varchar(255) NOT NULL, `time` datetime NOT NULL, `status` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `status` ( `id` int(11) NOT NULL, `name` varchar(255) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 kafka中数据: // 表test 中insert事件 {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"} //表status 中的事件 {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"} 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData, 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
这个邮件里提到了类似的问题。 https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将 “data”,“mysqlType”等格式不确定的字段定义为String类型, 下游通过udf自己再解析对应的json Best, Godfrey jindy_liu <[hidden email]> 于2020年7月21日周二 下午12:37写道: > 例如: > > mysql表: > CREATE TABLE `test` ( > `id` int(11) NOT NULL, > `name` varchar(255) NOT NULL, > `time` datetime NOT NULL, > `status` int(11) NOT NULL, > PRIMARY KEY (`id`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > > CREATE TABLE `status` ( > `id` int(11) NOT NULL, > `name` varchar(255) NOT NULL, > PRIMARY KEY (`id`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > > kafka中数据: > // 表test 中insert事件 > {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03 > > 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"} > > //表status 中的事件 > > {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"} > > 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData, > 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。 > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Administrator
|
Hi,
目前 Flink SQL CDC 是不支持自动感知新表的,得要提前定义要表的 schema 然后提交同步作业。比如你上面的例子,就需要定义两个 source 表: CREATE TABLE `test` ( `id` int, `name` string, `time` timestamp(3), `status` int ) with ( 'connector' = 'kafka', 'format' = 'canal-json', ... ); insert into downstream1 select * from `test`; CREATE TABLE `status` ( `id` int `name` string ) with ( 'connector' = 'kafka', 'format' = 'canal-json', ... ); insert into downstream2 select * from `status`; Best, Jark On Tue, 21 Jul 2020 at 15:19, godfrey he <[hidden email]> wrote: > > http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html > 这个邮件里提到了类似的问题。 > > https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将 > “data”,“mysqlType”等格式不确定的字段定义为String类型, > 下游通过udf自己再解析对应的json > > > Best, > Godfrey > > jindy_liu <[hidden email]> 于2020年7月21日周二 下午12:37写道: > > > 例如: > > > > mysql表: > > CREATE TABLE `test` ( > > `id` int(11) NOT NULL, > > `name` varchar(255) NOT NULL, > > `time` datetime NOT NULL, > > `status` int(11) NOT NULL, > > PRIMARY KEY (`id`) > > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > > CREATE TABLE `status` ( > > `id` int(11) NOT NULL, > > `name` varchar(255) NOT NULL, > > PRIMARY KEY (`id`) > > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > > kafka中数据: > > // 表test 中insert事件 > > {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03 > > > > > 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"} > > > > //表status 中的事件 > > > > > {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"} > > > > 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData, > > 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。 > > > > > > > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > |
Free forum by Nabble | Edit this page |