Hi 社区。
Flink 1.12.1 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有 forword 的ETL没有作用。 insert into table_a select id,udf(a),b,c from table_b; 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? ``` == Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, temp_table]], fields=[id...]) Stage 3 : Operator content : ChangelogNormalize(key=[id]) ship_strategy : HASH Stage 4 : Operator content : Calc(select=[...]) ship_strategy : FORWARD Stage 5 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.table_a], fields=[id...]) ship_strategy : FORWARD ``` |
Administrator
|
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 forward。 Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate On Thu, 4 Mar 2021 at 15:30, Qishang <[hidden email]> wrote: > Hi 社区。 > Flink 1.12.1 > > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有 > forword 的ETL没有作用。 > > insert into table_a select id,udf(a),b,c from table_b; > > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? > > ``` > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, temp_table]], fields=[id...]) > > Stage 3 : Operator > content : ChangelogNormalize(key=[id]) > ship_strategy : HASH > > Stage 4 : Operator > content : Calc(select=[...]) > ship_strategy : FORWARD > > Stage 5 : Data Sink > content : Sink: Sink(table=[default_catalog.default_database.table_a], > fields=[id...]) > ship_strategy : FORWARD > ``` > |
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
学到了,感谢。 Jark Wu <[hidden email]> 于2021年3月4日周四 下午11:11写道: > 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize > 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json > 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true > 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 > forward。 > > Best, > Jark > > [1]: > > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate > > On Thu, 4 Mar 2021 at 15:30, Qishang <[hidden email]> wrote: > > > Hi 社区。 > > Flink 1.12.1 > > > > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition > ,设置大的并发,对于只有 > > forword 的ETL没有作用。 > > > > insert into table_a select id,udf(a),b,c from table_b; > > > > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 > > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? > > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? > > > > ``` > > == Physical Execution Plan == > > Stage 1 : Data Source > > content : Source: TableSourceScan(table=[[default_catalog, > > default_database, temp_table]], fields=[id...]) > > > > Stage 3 : Operator > > content : ChangelogNormalize(key=[id]) > > ship_strategy : HASH > > > > Stage 4 : Operator > > content : Calc(select=[...]) > > ship_strategy : FORWARD > > > > Stage 5 : Data Sink > > content : Sink: Sink(table=[default_catalog.default_database.table_a], > > fields=[id...]) > > ship_strategy : FORWARD > > ``` > > > |
Hi Jark.
对于 upsert-kafka connector 有两个疑问: 1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* ` ,我试了下每次都是从 earliest 开始; 2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize 算子之后会变成2条,这个不是很理解? Qishang <[hidden email]> 于2021年3月5日周五 上午11:14写道: > > 某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。 > 学到了,感谢。 > > Jark Wu <[hidden email]> 于2021年3月4日周四 下午11:11写道: > >> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize >> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json >> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true >> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 >> forward。 >> >> Best, >> Jark >> >> [1]: >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate >> >> On Thu, 4 Mar 2021 at 15:30, Qishang <[hidden email]> wrote: >> >> > Hi 社区。 >> > Flink 1.12.1 >> > >> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition >> ,设置大的并发,对于只有 >> > forword 的ETL没有作用。 >> > >> > insert into table_a select id,udf(a),b,c from table_b; >> > >> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 >> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? >> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka >> 中生效吗?可以用在我上面说的场景上面吗? >> > >> > ``` >> > == Physical Execution Plan == >> > Stage 1 : Data Source >> > content : Source: TableSourceScan(table=[[default_catalog, >> > default_database, temp_table]], fields=[id...]) >> > >> > Stage 3 : Operator >> > content : ChangelogNormalize(key=[id]) >> > ship_strategy : HASH >> > >> > Stage 4 : Operator >> > content : Calc(select=[...]) >> > ship_strategy : FORWARD >> > >> > Stage 5 : Data Sink >> > content : Sink: Sink(table=[default_catalog.default_database.table_a], >> > fields=[id...]) >> > ship_strategy : FORWARD >> > ``` >> > >> > |
Free forum by Nabble | Edit this page |