flink 1.11 ddl 写mysql的问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 ddl 写mysql的问题

曹武
我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
代码如下:
        String sourceDdl =" CREATE TABLE debezium_source " +
                "( " +
                "id STRING NOT NULL, name STRING, description STRING, weight
Double" +
                ") " +
                "WITH (" +
                " 'connector' = 'kafka-0.11'," +
                " 'topic' = 'test0717'," +
                " 'properties.bootstrap.servers' = ' 172.22.20.206:9092', "
+
                "'scan.startup.mode' =
'group-offsets','properties.group.id'='test'," +
                "'format' = 'debezium-json'," +
                "'debezium-json.schema-include'='false'," +
                "'debezium-json.ignore-parse-errors'='true')";
        tEnv.executeSql(sourceDdl);
        System.out.println("init source ddl successful ==>" + sourceDdl);
        String sinkDdl = " CREATE TABLE sink " +
                "( " +
                "id STRING NOT NULL," +
                " name STRING, " +
                "description STRING," +
                " weight Double," +
                " PRIMARY KEY (id) NOT ENFORCED " +
                ")" +
                " WITH " +
                "( " +
                "'connector' = 'jdbc', " +
                "'url' =
'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
                "'table-name' = 'table-out', " +
                "'driver'= 'com.mysql.cj.jdbc.Driver'," +
                "'sink.buffer-flush.interval'='1s'," +
                "'sink.buffer-flush.max-rows'='1000'," +
                "'username'='DataPip', " +
                "'password'='DataPip')";
        tEnv.executeSql(sinkDdl);
        System.out.println("init sink ddl successful ==>" + sinkDdl);

         String dml = "INSERT INTO sink SELECT  id,name ,description,
weight FROM debezium_source";
        System.out.println("execute dml  ==>" + dml);
        tEnv.executeSql(dml);
        tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
'print')" +
                "LIKE debezium_source (EXCLUDING ALL)");
        tEnv.executeSql("INSERT INTO print_table SELECT  id,name
,description,  weight FROM debezium_source");



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 ddl 写mysql的问题

godfrey he
你观察到有sink写不过来导致反压吗?
或者你调大flush interval试试,让每个buffer攒更多的数据

曹武 <[hidden email]> 于2020年7月23日周四 下午4:48写道:

> 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
> 代码如下:
>         String sourceDdl =" CREATE TABLE debezium_source " +
>                 "( " +
>                 "id STRING NOT NULL, name STRING, description STRING,
> weight
> Double" +
>                 ") " +
>                 "WITH (" +
>                 " 'connector' = 'kafka-0.11'," +
>                 " 'topic' = 'test0717'," +
>                 " 'properties.bootstrap.servers' = ' 172.22.20.206:9092',
> "
> +
>                 "'scan.startup.mode' =
> 'group-offsets','properties.group.id'='test'," +
>                 "'format' = 'debezium-json'," +
>                 "'debezium-json.schema-include'='false'," +
>                 "'debezium-json.ignore-parse-errors'='true')";
>         tEnv.executeSql(sourceDdl);
>         System.out.println("init source ddl successful ==>" + sourceDdl);
>         String sinkDdl = " CREATE TABLE sink " +
>                 "( " +
>                 "id STRING NOT NULL," +
>                 " name STRING, " +
>                 "description STRING," +
>                 " weight Double," +
>                 " PRIMARY KEY (id) NOT ENFORCED " +
>                 ")" +
>                 " WITH " +
>                 "( " +
>                 "'connector' = 'jdbc', " +
>                 "'url' =
> 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
>                 "'table-name' = 'table-out', " +
>                 "'driver'= 'com.mysql.cj.jdbc.Driver'," +
>                 "'sink.buffer-flush.interval'='1s'," +
>                 "'sink.buffer-flush.max-rows'='1000'," +
>                 "'username'='DataPip', " +
>                 "'password'='DataPip')";
>         tEnv.executeSql(sinkDdl);
>         System.out.println("init sink ddl successful ==>" + sinkDdl);
>
>          String dml = "INSERT INTO sink SELECT  id,name ,description,
> weight FROM debezium_source";
>         System.out.println("execute dml  ==>" + dml);
>         tEnv.executeSql(dml);
>         tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
> 'print')" +
>                 "LIKE debezium_source (EXCLUDING ALL)");
>         tEnv.executeSql("INSERT INTO print_table SELECT  id,name
> ,description,  weight FROM debezium_source");
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 ddl 写mysql的问题

Jark
Administrator
kafka 数据源生产数据的速率是多少呢? 会不会数据源就是每秒100条数据呢。。。?
Btw, 查看反压状态是一个比较好的排查方式。

On Thu, 23 Jul 2020 at 20:25, godfrey he <[hidden email]> wrote:

> 你观察到有sink写不过来导致反压吗?
> 或者你调大flush interval试试,让每个buffer攒更多的数据
>
> 曹武 <[hidden email]> 于2020年7月23日周四 下午4:48写道:
>
> > 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
> > 代码如下:
> >         String sourceDdl =" CREATE TABLE debezium_source " +
> >                 "( " +
> >                 "id STRING NOT NULL, name STRING, description STRING,
> > weight
> > Double" +
> >                 ") " +
> >                 "WITH (" +
> >                 " 'connector' = 'kafka-0.11'," +
> >                 " 'topic' = 'test0717'," +
> >                 " 'properties.bootstrap.servers' = ' 172.22.20.206:9092
> ',
> > "
> > +
> >                 "'scan.startup.mode' =
> > 'group-offsets','properties.group.id'='test'," +
> >                 "'format' = 'debezium-json'," +
> >                 "'debezium-json.schema-include'='false'," +
> >                 "'debezium-json.ignore-parse-errors'='true')";
> >         tEnv.executeSql(sourceDdl);
> >         System.out.println("init source ddl successful ==>" + sourceDdl);
> >         String sinkDdl = " CREATE TABLE sink " +
> >                 "( " +
> >                 "id STRING NOT NULL," +
> >                 " name STRING, " +
> >                 "description STRING," +
> >                 " weight Double," +
> >                 " PRIMARY KEY (id) NOT ENFORCED " +
> >                 ")" +
> >                 " WITH " +
> >                 "( " +
> >                 "'connector' = 'jdbc', " +
> >                 "'url' =
> > 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
> >                 "'table-name' = 'table-out', " +
> >                 "'driver'= 'com.mysql.cj.jdbc.Driver'," +
> >                 "'sink.buffer-flush.interval'='1s'," +
> >                 "'sink.buffer-flush.max-rows'='1000'," +
> >                 "'username'='DataPip', " +
> >                 "'password'='DataPip')";
> >         tEnv.executeSql(sinkDdl);
> >         System.out.println("init sink ddl successful ==>" + sinkDdl);
> >
> >          String dml = "INSERT INTO sink SELECT  id,name ,description,
> > weight FROM debezium_source";
> >         System.out.println("execute dml  ==>" + dml);
> >         tEnv.executeSql(dml);
> >         tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
> > 'print')" +
> >                 "LIKE debezium_source (EXCLUDING ALL)");
> >         tEnv.executeSql("INSERT INTO print_table SELECT  id,name
> > ,description,  weight FROM debezium_source");
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>