我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
代码如下: String sourceDdl =" CREATE TABLE debezium_source " + "( " + "id STRING NOT NULL, name STRING, description STRING, weight Double" + ") " + "WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = 'test0717'," + " 'properties.bootstrap.servers' = ' 172.22.20.206:9092', " + "'scan.startup.mode' = 'group-offsets','properties.group.id'='test'," + "'format' = 'debezium-json'," + "'debezium-json.schema-include'='false'," + "'debezium-json.ignore-parse-errors'='true')"; tEnv.executeSql(sourceDdl); System.out.println("init source ddl successful ==>" + sourceDdl); String sinkDdl = " CREATE TABLE sink " + "( " + "id STRING NOT NULL," + " name STRING, " + "description STRING," + " weight Double," + " PRIMARY KEY (id) NOT ENFORCED " + ")" + " WITH " + "( " + "'connector' = 'jdbc', " + "'url' = 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " + "'table-name' = 'table-out', " + "'driver'= 'com.mysql.cj.jdbc.Driver'," + "'sink.buffer-flush.interval'='1s'," + "'sink.buffer-flush.max-rows'='1000'," + "'username'='DataPip', " + "'password'='DataPip')"; tEnv.executeSql(sinkDdl); System.out.println("init sink ddl successful ==>" + sinkDdl); String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source"; System.out.println("execute dml ==>" + dml); tEnv.executeSql(dml); tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')" + "LIKE debezium_source (EXCLUDING ALL)"); tEnv.executeSql("INSERT INTO print_table SELECT id,name ,description, weight FROM debezium_source"); -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
你观察到有sink写不过来导致反压吗?
或者你调大flush interval试试,让每个buffer攒更多的数据 曹武 <[hidden email]> 于2020年7月23日周四 下午4:48写道: > 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 > 代码如下: > String sourceDdl =" CREATE TABLE debezium_source " + > "( " + > "id STRING NOT NULL, name STRING, description STRING, > weight > Double" + > ") " + > "WITH (" + > " 'connector' = 'kafka-0.11'," + > " 'topic' = 'test0717'," + > " 'properties.bootstrap.servers' = ' 172.22.20.206:9092', > " > + > "'scan.startup.mode' = > 'group-offsets','properties.group.id'='test'," + > "'format' = 'debezium-json'," + > "'debezium-json.schema-include'='false'," + > "'debezium-json.ignore-parse-errors'='true')"; > tEnv.executeSql(sourceDdl); > System.out.println("init source ddl successful ==>" + sourceDdl); > String sinkDdl = " CREATE TABLE sink " + > "( " + > "id STRING NOT NULL," + > " name STRING, " + > "description STRING," + > " weight Double," + > " PRIMARY KEY (id) NOT ENFORCED " + > ")" + > " WITH " + > "( " + > "'connector' = 'jdbc', " + > "'url' = > 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " + > "'table-name' = 'table-out', " + > "'driver'= 'com.mysql.cj.jdbc.Driver'," + > "'sink.buffer-flush.interval'='1s'," + > "'sink.buffer-flush.max-rows'='1000'," + > "'username'='DataPip', " + > "'password'='DataPip')"; > tEnv.executeSql(sinkDdl); > System.out.println("init sink ddl successful ==>" + sinkDdl); > > String dml = "INSERT INTO sink SELECT id,name ,description, > weight FROM debezium_source"; > System.out.println("execute dml ==>" + dml); > tEnv.executeSql(dml); > tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = > 'print')" + > "LIKE debezium_source (EXCLUDING ALL)"); > tEnv.executeSql("INSERT INTO print_table SELECT id,name > ,description, weight FROM debezium_source"); > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Administrator
|
kafka 数据源生产数据的速率是多少呢? 会不会数据源就是每秒100条数据呢。。。?
Btw, 查看反压状态是一个比较好的排查方式。 On Thu, 23 Jul 2020 at 20:25, godfrey he <[hidden email]> wrote: > 你观察到有sink写不过来导致反压吗? > 或者你调大flush interval试试,让每个buffer攒更多的数据 > > 曹武 <[hidden email]> 于2020年7月23日周四 下午4:48写道: > > > 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 > > 代码如下: > > String sourceDdl =" CREATE TABLE debezium_source " + > > "( " + > > "id STRING NOT NULL, name STRING, description STRING, > > weight > > Double" + > > ") " + > > "WITH (" + > > " 'connector' = 'kafka-0.11'," + > > " 'topic' = 'test0717'," + > > " 'properties.bootstrap.servers' = ' 172.22.20.206:9092 > ', > > " > > + > > "'scan.startup.mode' = > > 'group-offsets','properties.group.id'='test'," + > > "'format' = 'debezium-json'," + > > "'debezium-json.schema-include'='false'," + > > "'debezium-json.ignore-parse-errors'='true')"; > > tEnv.executeSql(sourceDdl); > > System.out.println("init source ddl successful ==>" + sourceDdl); > > String sinkDdl = " CREATE TABLE sink " + > > "( " + > > "id STRING NOT NULL," + > > " name STRING, " + > > "description STRING," + > > " weight Double," + > > " PRIMARY KEY (id) NOT ENFORCED " + > > ")" + > > " WITH " + > > "( " + > > "'connector' = 'jdbc', " + > > "'url' = > > 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " + > > "'table-name' = 'table-out', " + > > "'driver'= 'com.mysql.cj.jdbc.Driver'," + > > "'sink.buffer-flush.interval'='1s'," + > > "'sink.buffer-flush.max-rows'='1000'," + > > "'username'='DataPip', " + > > "'password'='DataPip')"; > > tEnv.executeSql(sinkDdl); > > System.out.println("init sink ddl successful ==>" + sinkDdl); > > > > String dml = "INSERT INTO sink SELECT id,name ,description, > > weight FROM debezium_source"; > > System.out.println("execute dml ==>" + dml); > > tEnv.executeSql(dml); > > tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = > > 'print')" + > > "LIKE debezium_source (EXCLUDING ALL)"); > > tEnv.executeSql("INSERT INTO print_table SELECT id,name > > ,description, weight FROM debezium_source"); > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > |
Free forum by Nabble | Edit this page |