dear all:
我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。 JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1) .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/flink") .setUsername("root") .setPassword("123456") .setQuery(sql2) .setParameterTypes(types) .setBatchSize(1000) .build(); =========== 问题 ================ 如果上游数据来源时间是: 10:00 -> 900条 10:10 -> 120条 11:50 -> 1100条 15:00 -> 900条 JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是 10:10 -> 写入1000条,剩20条下次写入 11:50 -> 写入1000条,剩30条下次写入 15:00 -> 写入1000条,剩10条下次写入 我想要达到等待20分种,不满足batchSize也写入,能否实现? 10:10 -> 写入1000条,剩20条下次写入 10:30 -> 写入20条 11:50 -> 写入1000条,剩10条下次写入 12:10 -> 写入10条 15:20 -> 写入900条 thanks |
我理解现在就是你想要的效果。
batch-size和timeout两个条件是达到一个就会flush的。 nicygan <[hidden email]> 于2020年6月18日周四 下午5:05写道: > dear all: > 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。 > > > JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1) > .setDrivername("com.mysql.jdbc.Driver") > .setDBUrl("jdbc:mysql://localhost:3306/flink") > .setUsername("root") > .setPassword("123456") > .setQuery(sql2) > .setParameterTypes(types) > .setBatchSize(1000) > .build(); > > =========== 问题 ================ > 如果上游数据来源时间是: > 10:00 -> 900条 > 10:10 -> 120条 > 11:50 -> 1100条 > 15:00 -> 900条 > > JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是 > 10:10 -> 写入1000条,剩20条下次写入 > 11:50 -> 写入1000条,剩30条下次写入 > 15:00 -> 写入1000条,剩10条下次写入 > > 我想要达到等待20分种,不满足batchSize也写入,能否实现? > 10:10 -> 写入1000条,剩20条下次写入 > 10:30 -> 写入20条 > 11:50 -> 写入1000条,剩10条下次写入 > 12:10 -> 写入10条 > 15:20 -> 写入900条 > > thanks > -- Best, Benchao Li |
请问timeout值是多少?在哪里可设置? 在 2020-06-18 17:43:31,"Benchao Li" <[hidden email]> 写道: >我理解现在就是你想要的效果。 >batch-size和timeout两个条件是达到一个就会flush的。 > >nicygan <[hidden email]> 于2020年6月18日周四 下午5:05写道: > >> dear all: >> 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。 >> >> >> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1) >> .setDrivername("com.mysql.jdbc.Driver") >> .setDBUrl("jdbc:mysql://localhost:3306/flink") >> .setUsername("root") >> .setPassword("123456") >> .setQuery(sql2) >> .setParameterTypes(types) >> .setBatchSize(1000) >> .build(); >> >> =========== 问题 ================ >> 如果上游数据来源时间是: >> 10:00 -> 900条 >> 10:10 -> 120条 >> 11:50 -> 1100条 >> 15:00 -> 900条 >> >> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是 >> 10:10 -> 写入1000条,剩20条下次写入 >> 11:50 -> 写入1000条,剩30条下次写入 >> 15:00 -> 写入1000条,剩10条下次写入 >> >> 我想要达到等待20分种,不满足batchSize也写入,能否实现? >> 10:10 -> 写入1000条,剩20条下次写入 >> 10:30 -> 写入20条 >> 11:50 -> 写入1000条,剩10条下次写入 >> 12:10 -> 写入10条 >> 15:20 -> 写入900条 >> >> thanks >> > > >-- > >Best, >Benchao Li |
In reply to this post by nicygan
Hello
1.7.2是比较老的版本了, 可以考虑下升级新的版本,新的版本都支持你所需的功能的。 1.10.0 && 1.10.1 文档[1],对应的两个参数: 'connector.write.flush.max-rows' = '5000', -- optional, flush max size (includes all append, upsert and delete records), -- over this number of records, will flush data. The default value is "5000". 'connector.write.flush.interval' = '2s', --optional, flush interval mills, over this time, asynchronous threads will flush data. 如果使用1.10版本,推荐使用1.10.1版本,1.10.1在1.10.0的基础上修复了一些bug。 社区即将发布的1.11.0 文档[2], 对应的两个参数: sink.buffer-flush.max-rows -- The max size of buffered records before flush. Can be set to zero to disable it. sink.buffer-flush.interval -- The flush interval mills, over this time, asynchronous threads will flush data. -- Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' -- with the flush interval set allowing for complete async processing of buffered actions. Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options> > 在 2020年6月18日,17:04,nicygan <[hidden email]> 写道: > > dear all: > 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。 > > > JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1) > .setDrivername("com.mysql.jdbc.Driver") > .setDBUrl("jdbc:mysql://localhost:3306/flink") > .setUsername("root") > .setPassword("123456") > .setQuery(sql2) > .setParameterTypes(types) > .setBatchSize(1000) > .build(); > > =========== 问题 ================ > 如果上游数据来源时间是: > 10:00 -> 900条 > 10:10 -> 120条 > 11:50 -> 1100条 > 15:00 -> 900条 > > JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是 > 10:10 -> 写入1000条,剩20条下次写入 > 11:50 -> 写入1000条,剩30条下次写入 > 15:00 -> 写入1000条,剩10条下次写入 > > 我想要达到等待20分种,不满足batchSize也写入,能否实现? > 10:10 -> 写入1000条,剩20条下次写入 > 10:30 -> 写入20条 > 11:50 -> 写入1000条,剩10条下次写入 > 12:10 -> 写入10条 > 15:20 -> 写入900条 > > thanks |
Free forum by Nabble | Edit this page |