flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

jindy_liu
场景:    canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;  
若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?  
例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL,
`name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT NULL,
PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset', 'format'='canal-json',
'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
'connector'='kafka', 'topic'='test',
'properties.group.id'='c_mysql_binlog_postgres',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset', 'format'='canal-json',
'canal-json.ignore-parse-errors'='true');



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

Jark
Administrator
Hi,

我想先问一下你使用的是刚发布的 1.11.0 版本吗? 还是自己 build 的 release-1.11 分支呢?

另外,我理解下你的需求是  db1.test 同步到 db2.test,  db1.status 同步到 db2.status?
多表的*有序*同步是指?
我理解你只需要像定义 db1.test -> db2.test 一样,定义好 db1.status binlog table 然后 insert
into 到 db2.status mysql table就行了。

感谢反馈使用体验。

Best,
Jark


On Wed, 8 Jul 2020 at 10:30, jindy_liu <[hidden email]> wrote:

> 场景:    canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;
> 若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?
> 例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL,
> `name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
> NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
> TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT
> NULL,
> PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
>
> sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
> TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
> INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
> 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
> 'properties.bootstrap.servers'='localhost:9092',
> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
> 'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
> INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
> 'connector'='kafka', 'topic'='test',
> 'properties.group.id'='c_mysql_binlog_postgres',
> 'properties.bootstrap.servers'='localhost:9092',
> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
> 'canal-json.ignore-parse-errors'='true');
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

Leonard Xu
Hello,
我理解下你场景:d1的 test 表 和 status 表两者之间有关联,比如外键,比如 test 更新一条数据后 status也需要级联地更新一条数据。
希望通过 Flink 的CDC功能同步这两张表到db2后,任意时刻,这两张表的状态是原子的(两张表对应 d1中两张表的一个快照版本), 是这种场景吗?

如果是这种场景,现在是还没有支持的。

Best,
Leonard Xu


> 在 2020年7月8日,11:59,Jark Wu <[hidden email]> 写道:
>
> Hi,
>
> 我想先问一下你使用的是刚发布的 1.11.0 版本吗? 还是自己 build 的 release-1.11 分支呢?
>
> 另外,我理解下你的需求是  db1.test 同步到 db2.test,  db1.status 同步到 db2.status?
> 多表的*有序*同步是指?
> 我理解你只需要像定义 db1.test -> db2.test 一样,定义好 db1.status binlog table 然后 insert
> into 到 db2.status mysql table就行了。
>
> 感谢反馈使用体验。
>
> Best,
> Jark
>
>
> On Wed, 8 Jul 2020 at 10:30, jindy_liu <[hidden email]> wrote:
>
>> 场景:    canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;
>> 若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?
>> 例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL,
>> `name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
>> NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
>> TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT
>> NULL,
>> PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
>>
>> sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
>> TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
>> INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
>> 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
>> 'properties.bootstrap.servers'='localhost:9092',
>> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
>> 'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
>> INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
>> 'connector'='kafka', 'topic'='test',
>> 'properties.group.id'='c_mysql_binlog_postgres',
>> 'properties.bootstrap.servers'='localhost:9092',
>> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
>> 'canal-json.ignore-parse-errors'='true');
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

jindy_liu
In reply to this post by Jark
https://github.com/apache/flink/tree/release-1.11.0我看github上tag下已经发布了release-1.11.0,我就编了下tag下的release-1.11.0。最近在做实时计算的一些调研,我们第一步就是要做数据的实时搬运(异构存储),看flink
1.11有cdc功能,我关注了下。看发布了就立即试用了下,看看能不能用你们这个做变化数据的实时同步。1、体验了下,若mysql的binlog按单表有序到kafka,单topic,单分区,flink
cdc的同步确实很方便,几条sql语句就搞定了。2、若mysql binlog按db实例,多表有序到kafka
单topic,单分区,感觉不知道要怎么样定义这个ddl,
同时怎么保证按序同步。(比如表与表之前的数据存在逻辑上的外键约束等等,具体来说test表的status字端就是个外键,如果关联记录都有更新,那更新顺序就比较重要了,要严格按binlog顺序来)。今天看了下,源码里canal-json的解析,好像只解析到了json里的feild
和 operate 类型。感觉这个多表有序的场景应该也是比较多的需求的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

jindy_liu
In reply to this post by Leonard Xu
对的,只不过生产中,有些设计的时候外键没有显示声明,都是用流程保证更新表的顺序。
所以消费数据变化的时候,也是要按顺序消费。不然使用镜像数据的人,可能会出问题。

求教:除flink sql 的cdc功能外,flink的其它特性能否较好的支持这种场景呢?  需要写再底层点的api吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

Leonard Xu
Hello,
 
很有意思的话题,我理解这需要保证多个CDC数据源 的全局一致性, 多个业务表的 bin-log 通过 cdc接入flink后,得保证 每个数据源的写入目标库的时候有一个全局一致性的保证,这个底层的APi应该也支持不了的。
一种可能的思路是 抽取cdc 记录 的metadata里的 committed ts (原始数据库中每次变更的时间, debezuim 的source.ts_ms字段, canal的es 字段),通过这个时间来协调 多个 CDC 数据源的处理速度,这只是我的一个想法。

不过可以确定的是,目前的API应该拿不到这个信息,现在的 Flink 框架没法处理这个数据, 可以看下 一些CDC框架是否能做这个事情。

Best,
Leonard Xu


> 在 2020年7月8日,13:18,jindy_liu <[hidden email]> 写道:
>
> 对的,只不过生产中,有些设计的时候外键没有显示声明,都是用流程保证更新表的顺序。
> 所以消费数据变化的时候,也是要按顺序消费。不然使用镜像数据的人,可能会出问题。
>
> 求教:除flink sql 的cdc功能外,flink的其它特性能否较好的支持这种场景呢?  需要写再底层点的api吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

jindy_liu
恩,主要是看flink 的发布里说flink
支持cdc了,感觉这个功能好像是我要的,感觉好像我要做的事情能用flink都搞定。就不用多个开源平台切换与维护多个组件了。

我原本还想先基于flink sql 将数据存量数据先全量导一次异构存储(如hbase, pgsql等)(批量),然后再flink cdc
把mysql的bin-log变化数据搬运到异构存储(如hbase,
pgsql等)后(增量),同时再镜像一份cdc后的kafka里的json数据到下游(变化通知)。

那么下游再基于镜像的kafka里的数据(变化)+异构的镜像数据,再基于flink去做一些实时计算的场景需求(比如最近一个月内的前多少名的数据等),不用都挤在mysql的从库在做一些分析了,并且有些分析也不适合在mysql上搞,一些olap类的。

但实际demo了吧,光一个数据的实时搬运里,要解决的问题还挺多的,光flink好像不太行(可能是我不太熟悉,我接触flink时间较短)
问题:
1、存量+实时数据怎么结合起来,目前语义上只能做到“至少一次”,先存量搬运,再binlog实时迁移,但难以定位存量搬运完后对应的kafka的起始消费位置。(但业务场景如果只需要“至少一次”,还是可以用的,业务大部分是只需“至少一次”)

2、db里多表有序:这里有kafka性能问题和有序保证问题;目前业务场景db表变化不太快,一天1百w行数据的变更,可以搞定,同时也可以按需的N张表有序,不用整个db实例里的全部表。但这个有序感觉用flink
sql cdc还不太好搞多表。如果直接写程序去消费

3、多sink怎么保证数据一致性:具体来说,在增量同步的时候,flink需要先sink 异构存储(先),后要sink
kafka(后),怎么保证两个sink的先后次序与原子性?

现请问下,flink 的sink能定义先后吗?
如上面的,将kafka里的canal-json数据取出后,能先写pgsql成功,再把json数据原封不动写kafka吗?如果目前不支持,可否自己改造下支持?





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

jindy_liu
In reply to this post by Leonard Xu
如果用变更时间来有序,可能会有一个问题,是如果数据变更太快了,两条先后数据的时间可能是一样的?

这个问题不知道是不是可以用bin-log的gtid来搞?至少是递增的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

Leonard Xu

如果是同一个数据库(集群)的表,gtid应该是全局唯一且递增的,用gtid是更好的,异构的数据源就没有一个全局的id了,你可以试下. ^_^

祝好

> 在 2020年7月8日,15:32,jindy_liu <[hidden email]> 写道:
>
> 如果用变更时间来有序,可能会有一个问题,是如果数据变更太快了,两条先后数据的时间可能是一样的?
>
> 这个问题不知道是不是可以用bin-log的gtid来搞?至少是递增的?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/