这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink.
```java package main; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.EnvironmentSettings; public class Main { public static void main(String[] args) { StreamTableEnvironment tEnv = StreamTableEnvironment.create( StreamExecutionEnvironment.getExecutionEnvironment(), EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() ); tEnv.executeSql("CREATE TABLE gen_stuff (\n" + "\tstuff_id int,\n" + "\tstuff_base_id int,\n" + "\tstuff_name varchar(20)\n" + ") WITH (\n" + " 'connector' = 'datagen'," + "'rows-per-second'='10000000'," + "'fields.stuff_id.kind'='sequence'," + "'fields.stuff_id.start'='1'," + "'fields.stuff_id.end'='10000000'," + "'fields.stuff_name.length'='15'" + ")" ); tEnv.executeSql("CREATE TABLE result_stuff (\n" + "\tstuff_id int,\n" + "\tstuff_base_id int,\n" + "\tstuff_name varchar(20)\n" + ") WITH (\n" + "\t'connector' = 'jdbc',\n" + "\t'url' = 'jdbc:mysql://127.0.0.1:3306/test?rewritebatchedstatements=true',\n" + "\t'table-name' = 'result_stuff',\n" + "\t'username' = 'root',\n" + "\t'password' = ''\n" + ")" ); tEnv.executeSql("insert into result_stuff select stuff_id, stuff_base_id, stuff_name from gen_stuff"); } } ``` 然而,mysql 每秒大约只多 10000 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。 请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。 我使用的和 jdbc 有关的依赖如下: ```xml <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.21</version> </dependency> ``` (作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
每秒10000多条不算少了吧,如果还想再高一些,可以提高一下sink.buffer-flush.max-rows配置,默认是100
LittleFall <[hidden email]> 于2020年8月20日周四 下午7:56写道: > 这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink. > > ```java > package main; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > > public class Main { > > public static void main(String[] args) { > StreamTableEnvironment tEnv = StreamTableEnvironment.create( > StreamExecutionEnvironment.getExecutionEnvironment(), > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > ); > > tEnv.executeSql("CREATE TABLE gen_stuff (\n" + > "\tstuff_id int,\n" + > "\tstuff_base_id int,\n" + > "\tstuff_name varchar(20)\n" + > ") WITH (\n" + > " 'connector' = 'datagen'," + > "'rows-per-second'='10000000'," + > "'fields.stuff_id.kind'='sequence'," + > "'fields.stuff_id.start'='1'," + > "'fields.stuff_id.end'='10000000'," + > "'fields.stuff_name.length'='15'" + > ")" > ); > tEnv.executeSql("CREATE TABLE result_stuff (\n" + > "\tstuff_id int,\n" + > "\tstuff_base_id int,\n" + > "\tstuff_name varchar(20)\n" + > ") WITH (\n" + > "\t'connector' = 'jdbc',\n" + > "\t'url' = > 'jdbc:mysql://127.0.0.1:3306/test?rewritebatchedstatements=true',\n" + > "\t'table-name' = 'result_stuff',\n" + > "\t'username' = 'root',\n" + > "\t'password' = ''\n" + > ")" > ); > > tEnv.executeSql("insert into result_stuff select stuff_id, > stuff_base_id, stuff_name from gen_stuff"); > } > } > ``` > > 然而,mysql 每秒大约只多 10000 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。 > > 请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。 > > 我使用的和 jdbc 有关的依赖如下: > > ```xml > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>mysql</groupId> > <artifactId>mysql-connector-java</artifactId> > <version>8.0.21</version> > </dependency> > ``` > > (作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
谢谢你的回复,它确实帮到了我。
我找到了另一个问题: rewriteBatchedStatements=true 应该是驼峰式,而我之前用了全小写,以至于按批写入不生效。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
哈喽,你现在写入效率是否增加,我也遇到了,感觉写入速度比较低的问题
| | 引领 | | [hidden email] | 签名由网易邮箱大师定制 在2020年08月20日 22:52,LittleFall<[hidden email]> 写道: 谢谢你的回复,它确实帮到了我。 我找到了另一个问题: rewriteBatchedStatements=true 应该是驼峰式,而我之前用了全小写,以至于按批写入不生效。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
jdbc connector是1.11的吗,我之前还是得自己封装,搞好复杂。batch+timeout+retry+metric等机制。
引领 <[hidden email]> 于2020年8月21日周五 上午9:41写道: > 哈喽,你现在写入效率是否增加,我也遇到了,感觉写入速度比较低的问题 > > > | | > 引领 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > > 在2020年08月20日 22:52,LittleFall<[hidden email]> 写道: > 谢谢你的回复,它确实帮到了我。 > > 我找到了另一个问题: > > rewriteBatchedStatements=true > > 应该是驼峰式,而我之前用了全小写,以至于按批写入不生效。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Benchao Li-2
赞,是的,MySQL单机每秒能写入10000条应该很多了,我之前做过一个业务需求,直接用Jdbc,而不是Flink jdbc connector写,按每批5000条数据,测下来性能也差不多10000条,这个应该是MYSQL的瓶颈,而不在connector这边
> -----原始邮件----- > 发件人: "Benchao Li" <[hidden email]> > 发送时间: 2020-08-20 22:11:53 (星期四) > 收件人: user-zh <[hidden email]> > 抄送: > 主题: Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗 > > 每秒10000多条不算少了吧,如果还想再高一些,可以提高一下sink.buffer-flush.max-rows配置,默认是100 > > LittleFall <[hidden email]> 于2020年8月20日周四 下午7:56写道: > > > 这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink. > > > > ```java > > package main; > > > > import > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > > import org.apache.flink.table.api.EnvironmentSettings; > > > > public class Main { > > > > public static void main(String[] args) { > > StreamTableEnvironment tEnv = StreamTableEnvironment.create( > > StreamExecutionEnvironment.getExecutionEnvironment(), > > > > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > ); > > > > tEnv.executeSql("CREATE TABLE gen_stuff (\n" + > > "\tstuff_id int,\n" + > > "\tstuff_base_id int,\n" + > > "\tstuff_name varchar(20)\n" + > > ") WITH (\n" + > > " 'connector' = 'datagen'," + > > "'rows-per-second'='10000000'," + > > "'fields.stuff_id.kind'='sequence'," + > > "'fields.stuff_id.start'='1'," + > > "'fields.stuff_id.end'='10000000'," + > > "'fields.stuff_name.length'='15'" + > > ")" > > ); > > tEnv.executeSql("CREATE TABLE result_stuff (\n" + > > "\tstuff_id int,\n" + > > "\tstuff_base_id int,\n" + > > "\tstuff_name varchar(20)\n" + > > ") WITH (\n" + > > "\t'connector' = 'jdbc',\n" + > > "\t'url' = > > 'jdbc:mysql://127.0.0.1:3306/test?rewritebatchedstatements=true',\n" + > > "\t'table-name' = 'result_stuff',\n" + > > "\t'username' = 'root',\n" + > > "\t'password' = ''\n" + > > ")" > > ); > > > > tEnv.executeSql("insert into result_stuff select stuff_id, > > stuff_base_id, stuff_name from gen_stuff"); > > } > > } > > ``` > > > > 然而,mysql 每秒大约只多 10000 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。 > > > > 请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。 > > > > 我使用的和 jdbc 有关的依赖如下: > > > > ```xml > > <dependency> > > <groupId>org.apache.flink</groupId> > > > > <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> > > <version>${flink.version}</version> > > </dependency> > > <dependency> > > <groupId>mysql</groupId> > > <artifactId>mysql-connector-java</artifactId> > > <version>8.0.21</version> > > </dependency> > > ``` > > > > (作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s) > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > -- > > Best, > Benchao Li ------------------------------ 刘大龙 浙江大学 控制系 智能系统与控制研究所 工控新楼217 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 Tel:18867547281 |
Free forum by Nabble | Edit this page |