flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

nicygan
dear all:
 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。


JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://localhost:3306/flink")
                .setUsername("root")
                .setPassword("123456")
                .setQuery(sql2)
                .setParameterTypes(types)
 .setBatchSize(1000)
               .build();

=========== 问题 ================
如果上游数据来源时间是:
10:00 -> 900条
10:10 -> 120条
11:50 -> 1100条
15:00 -> 900条

JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
10:10 -> 写入1000条,剩20条下次写入
11:50 -> 写入1000条,剩30条下次写入
15:00 -> 写入1000条,剩10条下次写入

我想要达到等待20分种,不满足batchSize也写入,能否实现?
10:10 -> 写入1000条,剩20条下次写入
10:30 -> 写入20条
11:50 -> 写入1000条,剩10条下次写入
12:10 -> 写入10条
15:20 -> 写入900条

thanks
Reply | Threaded
Open this post in threaded view
|

Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

Benchao Li-2
我理解现在就是你想要的效果。
batch-size和timeout两个条件是达到一个就会flush的。

nicygan <[hidden email]> 于2020年6月18日周四 下午5:05写道:

> dear all:
>  我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>
>
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>                 .setDrivername("com.mysql.jdbc.Driver")
>                 .setDBUrl("jdbc:mysql://localhost:3306/flink")
>                 .setUsername("root")
>                 .setPassword("123456")
>                 .setQuery(sql2)
>                 .setParameterTypes(types)
>  .setBatchSize(1000)
>                .build();
>
> =========== 问题 ================
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
>
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
>
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
>
> thanks
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

nicygan



请问timeout值是多少?在哪里可设置?



在 2020-06-18 17:43:31,"Benchao Li" <[hidden email]> 写道:

>我理解现在就是你想要的效果。
>batch-size和timeout两个条件是达到一个就会flush的。
>
>nicygan <[hidden email]> 于2020年6月18日周四 下午5:05写道:
>
>> dear all:
>>  我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>>
>>
>> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>>                 .setDrivername("com.mysql.jdbc.Driver")
>>                 .setDBUrl("jdbc:mysql://localhost:3306/flink")
>>                 .setUsername("root")
>>                 .setPassword("123456")
>>                 .setQuery(sql2)
>>                 .setParameterTypes(types)
>>  .setBatchSize(1000)
>>                .build();
>>
>> =========== 问题 ================
>> 如果上游数据来源时间是:
>> 10:00 -> 900条
>> 10:10 -> 120条
>> 11:50 -> 1100条
>> 15:00 -> 900条
>>
>> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
>> 10:10 -> 写入1000条,剩20条下次写入
>> 11:50 -> 写入1000条,剩30条下次写入
>> 15:00 -> 写入1000条,剩10条下次写入
>>
>> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
>> 10:10 -> 写入1000条,剩20条下次写入
>> 10:30 -> 写入20条
>> 11:50 -> 写入1000条,剩10条下次写入
>> 12:10 -> 写入10条
>> 15:20 -> 写入900条
>>
>> thanks
>>
>
>
>--
>
>Best,
>Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

Leonard Xu
In reply to this post by nicygan
Hello

1.7.2是比较老的版本了, 可以考虑下升级新的版本,新的版本都支持你所需的功能的。

1.10.0 && 1.10.1 文档[1],对应的两个参数:

  'connector.write.flush.max-rows' = '5000', -- optional, flush max size (includes all append, upsert and delete records),
                                                                         -- over this number of records, will flush data. The default value is "5000".
  'connector.write.flush.interval' = '2s',         --optional, flush interval mills, over this time, asynchronous threads will flush data.
如果使用1.10版本,推荐使用1.10.1版本,1.10.1在1.10.0的基础上修复了一些bug。

社区即将发布的1.11.0 文档[2], 对应的两个参数:
sink.buffer-flush.max-rows -- The max size of buffered records before flush. Can be set to zero to disable it.
sink.buffer-flush.interval -- The flush interval mills, over this time, asynchronous threads will flush data.
                                                -- Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0'
                                                -- with the flush interval set allowing for complete async processing of buffered actions.

Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector>
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options>


> 在 2020年6月18日,17:04,nicygan <[hidden email]> 写道:
>
> dear all:
> 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>
>
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>                .setDrivername("com.mysql.jdbc.Driver")
>                .setDBUrl("jdbc:mysql://localhost:3306/flink")
>                .setUsername("root")
>                .setPassword("123456")
>                .setQuery(sql2)
>                .setParameterTypes(types)
> .setBatchSize(1000)
>               .build();
>
> =========== 问题 ================
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
>
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
>
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
>
> thanks