flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? |
试着回答下这两个问题。
> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? 是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc connector支持多并发读取,下游sink自然就能解决。 > flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? 这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, 否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 祝好, Leonard |
针对现在flink sql cdc下游并行度无法修改问题,是否可以分两步实现?谢谢! 1. flink sql cdc发到下游kafka,通过 upsert kafka connector,以debezium或canal格式,kafka topic开多个分区 2. 再从kafka消费,通过flink sql同步到最终mysql库 在 2021-06-08 19:49:40,"Leonard Xu" <[hidden email]> 写道: >试着回答下这两个问题。 > >> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? >是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc connector支持多并发读取,下游sink自然就能解决。 > > >> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? > >这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, >否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> > >祝好, >Leonard |
In reply to this post by casel.chen
1、有必要考虑其他方案了,如果是单表存量数据很大,且不说下游sink的问题,单单是snapshot阶段可能耗时过长,如果一旦失败,就只能整体重来(因为此时不能做checkpoint),任务的成功率就很值得怀疑(当然主要还看存量数据到底有多大)。另外,如果能获取全局锁还好,如果无法获取,则会锁表直到存量数据全部拷贝完毕,基本等于业务down掉。
2、如果只是简单的insert into xxx select xxx,就不用担心,runtime在遇到上下游并行度不一致时,如果有主键会按照主键hash的。 在 2021-06-08 14:05:17,"casel.chen" <[hidden email]> 写道: >flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? >flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? |
单表存量数据的确很大,但业务方不需要同步存量,只需要同步增量就行
flink sql如何修改下游sink端的并行度呢?通过sql hint? 在 2021-06-11 11:08:58,"东东" <[hidden email]> 写道: >1、有必要考虑其他方案了,如果是单表存量数据很大,且不说下游sink的问题,单单是snapshot阶段可能耗时过长,如果一旦失败,就只能整体重来(因为此时不能做checkpoint),任务的成功率就很值得怀疑(当然主要还看存量数据到底有多大)。另外,如果能获取全局锁还好,如果无法获取,则会锁表直到存量数据全部拷贝完毕,基本等于业务down掉。 >2、如果只是简单的insert into xxx select xxx,就不用担心,runtime在遇到上下游并行度不一致时,如果有主键会按照主键hash的。 > > >在 2021-06-08 14:05:17,"casel.chen" <[hidden email]> 写道: >>flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? >>flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? |
hi
sink 端可以通过 sink.parallelism 进行设置. ----- Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee |
引用 Leonard Xu大佬之前的回答:
> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? 这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, 否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 说明加 sink.parallelism 是不行的 在 2021-06-11 15:44:51,"JasonLee" <[hidden email]> 写道: >hi > >sink 端可以通过 sink.parallelism 进行设置. > > > >----- >Best Wishes >JasonLee >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢? 另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash shuffle,确保相同pk的记录发到同一个sink task。 在 2021-06-11 15:57:29,"casel.chen" <[hidden email]> 写道: >引用 Leonard Xu大佬之前的回答: > >> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? > >这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, >否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> > >说明加 sink.parallelism 是不行的 > > > > > > > > > > > > > > >在 2021-06-11 15:44:51,"JasonLee" <[hidden email]> 写道: >>hi >> >>sink 端可以通过 sink.parallelism 进行设置. >> >> >> >>----- >>Best Wishes >>JasonLee >>-- >>Sent from: http://apache-flink.147419.n8.nabble.com/ |
我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。
上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理? 用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数 在 2021-06-11 16:32:00,"东东" <[hidden email]> 写道: > > > >他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢? > > > > >另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash shuffle,确保相同pk的记录发到同一个sink task。 > > >在 2021-06-11 15:57:29,"casel.chen" <[hidden email]> 写道: >>引用 Leonard Xu大佬之前的回答: >> >>> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? >> >>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, >>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> >> >>说明加 sink.parallelism 是不行的 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2021-06-11 15:44:51,"JasonLee" <[hidden email]> 写道: >>>hi >>> >>>sink 端可以通过 sink.parallelism 进行设置. >>> >>> >>> >>>----- >>>Best Wishes >>>JasonLee >>>-- >>>Sent from: http://apache-flink.147419.n8.nabble.com/ |
1、升级到1.13 2、能不能追上要看写入量到底有多大,以及下游的处理能力啊,就是mysql自己的主从复制也不一定能确保追上,实践就知道了。 3、可以设置一下default.parallism试试,如果发现被chain到一起了,可以把operator chain关掉试试。 在 2021-06-11 18:57:36,"casel.chen" <[hidden email]> 写道: >我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。 >上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理? >用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数 > > > > > > > > > > > > > > > > > >在 2021-06-11 16:32:00,"东东" <[hidden email]> 写道: >> >> >> >>他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢? >> >> >> >> >>另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash shuffle,确保相同pk的记录发到同一个sink task。 >> >> >>在 2021-06-11 15:57:29,"casel.chen" <[hidden email]> 写道: >>>引用 Leonard Xu大佬之前的回答: >>> >>>> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? >>> >>>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, >>>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> >>> >>>说明加 sink.parallelism 是不行的 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>在 2021-06-11 15:44:51,"JasonLee" <[hidden email]> 写道: >>>>hi >>>> >>>>sink 端可以通过 sink.parallelism 进行设置. >>>> >>>> >>>> >>>>----- >>>>Best Wishes >>>>JasonLee >>>>-- >>>>Sent from: http://apache-flink.147419.n8.nabble.com/ |
即使下游sink能加大并行度,也不能确保上游同一个PK记录会流入到同一个task,也就无法保证操作同一条记录的顺序能正确replay,不是么?
在 2021-06-11 19:30:39,"东东" <[hidden email]> 写道: > > > >1、升级到1.13 >2、能不能追上要看写入量到底有多大,以及下游的处理能力啊,就是mysql自己的主从复制也不一定能确保追上,实践就知道了。 >3、可以设置一下default.parallism试试,如果发现被chain到一起了,可以把operator chain关掉试试。 > > >在 2021-06-11 18:57:36,"casel.chen" <[hidden email]> 写道: >>我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。 >>上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理? >>用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2021-06-11 16:32:00,"东东" <[hidden email]> 写道: >>> >>> >>> >>>他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢? >>> >>> >>> >>> >>>另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash shuffle,确保相同pk的记录发到同一个sink task。 >>> >>> >>>在 2021-06-11 15:57:29,"casel.chen" <[hidden email]> 写道: >>>>引用 Leonard Xu大佬之前的回答: >>>> >>>>> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? >>>> >>>>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, >>>>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> >>>> >>>>说明加 sink.parallelism 是不行的 >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>在 2021-06-11 15:44:51,"JasonLee" <[hidden email]> 写道: >>>>>hi >>>>> >>>>>sink 端可以通过 sink.parallelism 进行设置. >>>>> >>>>> >>>>> >>>>>----- >>>>>Best Wishes >>>>>JasonLee >>>>>-- >>>>>Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Leonard Xu
请问 flink sql cdc 场景下如何增大下游sink端并行度?
我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。 而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到 目标mysql。是想通过kafka partition增大sink并行度 初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。 以下是作业内容: -- source CREATE TABLE mysql_old_order_table ( order_number BIGINT, price DECIMAL, order_time TIMESTAMP(3), PRIMARY KEY (order_number) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'flink-test', 'table-name' = 'old_order' ); -- sink CREATE TABLE kafka_order_table ( order_number BIGINT, price DECIMAL, order_time TIMESTAMP(3), PRIMARY KEY (order_number) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'order', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'json', 'value.format' = 'json' ); -- insert INSERT INTO kafka_order_table SELECT * FROM mysql_old_order_table; -- source CREATE TABLE kafka_order_table ( order_number BIGINT, price DECIMAL, order_time TIMESTAMP(3), PRIMARY KEY (order_number) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'order', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'json', 'value.format' = 'json' ); -- sink CREATE TABLE mysql_order_table ( order_number BIGINT, price DECIMAL, order_time TIMESTAMP(3), PRIMARY KEY (order_number) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/flink-test', 'table-name' = 'order', 'username' = 'root', 'password' = 'root', 'sink.buffer-flush.max-rows' = '3', 'sink.buffer-flush.interval' = '1s' ); -- insert INSERT INTO mysql_order_table SELECT * FROM kafka_order_table; 在 2021-06-08 19:49:40,"Leonard Xu" <[hidden email]> 写道: >试着回答下这两个问题。 > >> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? >是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc connector支持多并发读取,下游sink自然就能解决。 > > >> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? > >这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, >否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> > >祝好, >Leonard |
Free forum by Nabble | Edit this page |