Hi, Community
JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 我的问题是:是否有办法强制刷新buffer中的数据入库? @Public public interface OutputFormat<IT> extends Serializable { /** * Configures this output format. Since output formats are instantiated generically and hence parameterless, * this method is the place where the output formats set their basic fields based on configuration values. * <p> * This method is always called first on a newly instantiated output format. * * @param parameters The configuration with all parameters. */ void configure(Configuration parameters); /** * Opens a parallel instance of the output format to store the result of its parallel instance. * <p> * When this method is called, the output format it guaranteed to be configured. * * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws IOException Thrown, if the output could not be opened due to an I/O problem. */ void open(int taskNumber, int numTasks) throws IOException; /** * Adds a record to the output. * <p> * When this method is called, the output format it guaranteed to be opened. * * @param record The records to add to the output. * @throws IOException Thrown, if the records could not be added to to an I/O problem. */ void writeRecord(IT record) throws IOException; /** * Method that marks the end of the life-cycle of parallel output instance. Should be used to close * channels and streams and release resources. * After this method returns without an error, the output is assumed to be correct. * <p> * When this method is called, the output format it guaranteed to be opened. * * @throws IOException Thrown, if the input could not be closed properly. */ void close() throws IOException; } -- *Best Regards* *Jeremy Mei* |
Hi,
是的,感觉你是对的。 `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState 时候调用format.flush。 WDYT @Jark @ Leonard Best, Hailong 在 2020-12-09 17:13:14,"jie mei" <[hidden email]> 写道: >Hi, Community > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 > >我的问题是:是否有办法强制刷新buffer中的数据入库? > > >@Public >public interface OutputFormat<IT> extends Serializable { > > /** > * Configures this output format. Since output formats are >instantiated generically and hence parameterless, > * this method is the place where the output formats set their >basic fields based on configuration values. > * <p> > * This method is always called first on a newly instantiated output format. > * > * @param parameters The configuration with all parameters. > */ > void configure(Configuration parameters); > > /** > * Opens a parallel instance of the output format to store the >result of its parallel instance. > * <p> > * When this method is called, the output format it guaranteed to >be configured. > * > * @param taskNumber The number of the parallel instance. > * @param numTasks The number of parallel tasks. > * @throws IOException Thrown, if the output could not be opened >due to an I/O problem. > */ > void open(int taskNumber, int numTasks) throws IOException; > > > /** > * Adds a record to the output. > * <p> > * When this method is called, the output format it guaranteed to be opened. > * > * @param record The records to add to the output. > * @throws IOException Thrown, if the records could not be added to >to an I/O problem. > */ > void writeRecord(IT record) throws IOException; > > /** > * Method that marks the end of the life-cycle of parallel output >instance. Should be used to close > * channels and streams and release resources. > * After this method returns without an error, the output is >assumed to be correct. > * <p> > * When this method is called, the output format it guaranteed to be opened. > * > * @throws IOException Thrown, if the input could not be closed properly. > */ > void close() throws IOException; >} > > >-- > >*Best Regards* >*Jeremy Mei* |
Administrator
|
Hi Jie,
看起来确实是个问题。 sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 可以帮忙创建个 issue 么? Best, Jark On Thu, 10 Dec 2020 at 02:05, hailongwang <[hidden email]> wrote: > Hi, > 是的,感觉你是对的。 > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState > 时候调用format.flush。 > WDYT @Jark @ Leonard > > Best, > Hailong > > > 在 2020-12-09 17:13:14,"jie mei" <[hidden email]> 写道: > >Hi, Community > > > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 > > > >我的问题是:是否有办法强制刷新buffer中的数据入库? > > > > > >@Public > >public interface OutputFormat<IT> extends Serializable { > > > > /** > > * Configures this output format. Since output formats are > >instantiated generically and hence parameterless, > > * this method is the place where the output formats set their > >basic fields based on configuration values. > > * <p> > > * This method is always called first on a newly instantiated output format. > > * > > * @param parameters The configuration with all parameters. > > */ > > void configure(Configuration parameters); > > > > /** > > * Opens a parallel instance of the output format to store the > >result of its parallel instance. > > * <p> > > * When this method is called, the output format it guaranteed to > >be configured. > > * > > * @param taskNumber The number of the parallel instance. > > * @param numTasks The number of parallel tasks. > > * @throws IOException Thrown, if the output could not be opened > >due to an I/O problem. > > */ > > void open(int taskNumber, int numTasks) throws IOException; > > > > > > /** > > * Adds a record to the output. > > * <p> > > * When this method is called, the output format it guaranteed to be opened. > > * > > * @param record The records to add to the output. > > * @throws IOException Thrown, if the records could not be added to > >to an I/O problem. > > */ > > void writeRecord(IT record) throws IOException; > > > > /** > > * Method that marks the end of the life-cycle of parallel output > >instance. Should be used to close > > * channels and streams and release resources. > > * After this method returns without an error, the output is > >assumed to be correct. > > * <p> > > * When this method is called, the output format it guaranteed to be opened. > > * > > * @throws IOException Thrown, if the input could not be closed properly. > > */ > > void close() throws IOException; > >} > > > > > >-- > > > >*Best Regards* > >*Jeremy Mei* > > > > > > |
Hi,Jark
好的,我会就此创建一个issue Jark Wu <[hidden email]> 于2020年12月10日周四 上午11:17写道: > Hi Jie, > > 看起来确实是个问题。 > sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 > 可以帮忙创建个 issue 么? > > Best, > Jark > > On Thu, 10 Dec 2020 at 02:05, hailongwang <[hidden email]> wrote: > > > Hi, > > 是的,感觉你是对的。 > > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 > > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 > snapshotState > > 时候调用format.flush。 > > WDYT @Jark @ Leonard > > > > Best, > > Hailong > > > > > > 在 2020-12-09 17:13:14,"jie mei" <[hidden email]> 写道: > > >Hi, Community > > > > > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 > > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 > > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 > > > > > >我的问题是:是否有办法强制刷新buffer中的数据入库? > > > > > > > > >@Public > > >public interface OutputFormat<IT> extends Serializable { > > > > > > /** > > > * Configures this output format. Since output formats are > > >instantiated generically and hence parameterless, > > > * this method is the place where the output formats set their > > >basic fields based on configuration values. > > > * <p> > > > * This method is always called first on a newly instantiated output > format. > > > * > > > * @param parameters The configuration with all parameters. > > > */ > > > void configure(Configuration parameters); > > > > > > /** > > > * Opens a parallel instance of the output format to store the > > >result of its parallel instance. > > > * <p> > > > * When this method is called, the output format it guaranteed to > > >be configured. > > > * > > > * @param taskNumber The number of the parallel instance. > > > * @param numTasks The number of parallel tasks. > > > * @throws IOException Thrown, if the output could not be opened > > >due to an I/O problem. > > > */ > > > void open(int taskNumber, int numTasks) throws IOException; > > > > > > > > > /** > > > * Adds a record to the output. > > > * <p> > > > * When this method is called, the output format it guaranteed to be > opened. > > > * > > > * @param record The records to add to the output. > > > * @throws IOException Thrown, if the records could not be added to > > >to an I/O problem. > > > */ > > > void writeRecord(IT record) throws IOException; > > > > > > /** > > > * Method that marks the end of the life-cycle of parallel output > > >instance. Should be used to close > > > * channels and streams and release resources. > > > * After this method returns without an error, the output is > > >assumed to be correct. > > > * <p> > > > * When this method is called, the output format it guaranteed to be > opened. > > > * > > > * @throws IOException Thrown, if the input could not be closed > properly. > > > */ > > > void close() throws IOException; > > >} > > > > > > > > >-- > > > > > >*Best Regards* > > >*Jeremy Mei* > > > > > > > > > > > > > -- *Best Regards* *Jeremy Mei* |
你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库,
也可以说是OutputFormat不会参与cp, 所以at-least-once都不一定能保证。 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗? 祝好, Leonard > 在 2020年12月10日,11:22,jie mei <[hidden email]> 写道: > > Hi,Jark > > 好的,我会就此创建一个issue > > Jark Wu <[hidden email] <mailto:[hidden email]>> 于2020年12月10日周四 上午11:17写道: > Hi Jie, > > 看起来确实是个问题。 > sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 > 可以帮忙创建个 issue 么? > > Best, > Jark > > On Thu, 10 Dec 2020 at 02:05, hailongwang <[hidden email] <mailto:[hidden email]>> wrote: > > > Hi, > > 是的,感觉你是对的。 > > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 > > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState > > 时候调用format.flush。 > > WDYT @Jark @ Leonard > > > > Best, > > Hailong > > > > > > 在 2020-12-09 17:13:14,"jie mei" <[hidden email] <mailto:[hidden email]>> 写道: > > >Hi, Community > > > > > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 > > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 > > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 > > > > > >我的问题是:是否有办法强制刷新buffer中的数据入库? > > > > > > > > >@Public > > >public interface OutputFormat<IT> extends Serializable { > > > > > > /** > > > * Configures this output format. Since output formats are > > >instantiated generically and hence parameterless, > > > * this method is the place where the output formats set their > > >basic fields based on configuration values. > > > * <p> > > > * This method is always called first on a newly instantiated output format. > > > * > > > * @param parameters The configuration with all parameters. > > > */ > > > void configure(Configuration parameters); > > > > > > /** > > > * Opens a parallel instance of the output format to store the > > >result of its parallel instance. > > > * <p> > > > * When this method is called, the output format it guaranteed to > > >be configured. > > > * > > > * @param taskNumber The number of the parallel instance. > > > * @param numTasks The number of parallel tasks. > > > * @throws IOException Thrown, if the output could not be opened > > >due to an I/O problem. > > > */ > > > void open(int taskNumber, int numTasks) throws IOException; > > > > > > > > > /** > > > * Adds a record to the output. > > > * <p> > > > * When this method is called, the output format it guaranteed to be opened. > > > * > > > * @param record The records to add to the output. > > > * @throws IOException Thrown, if the records could not be added to > > >to an I/O problem. > > > */ > > > void writeRecord(IT record) throws IOException; > > > > > > /** > > > * Method that marks the end of the life-cycle of parallel output > > >instance. Should be used to close > > > * channels and streams and release resources. > > > * After this method returns without an error, the output is > > >assumed to be correct. > > > * <p> > > > * When this method is called, the output format it guaranteed to be opened. > > > * > > > * @throws IOException Thrown, if the input could not be closed properly. > > > */ > > > void close() throws IOException; > > >} > > > > > > > > >-- > > > > > >*Best Regards* > > >*Jeremy Mei* > > > > > > > > > > > > > > > -- > > Best Regards > Jeremy Mei |
Hi, Leonard
好的,我将会提一个PR来修复这个issue Leonard Xu <[hidden email]> 于2020年12月10日周四 下午12:10写道: > 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, > 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为 > OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库, > 也可以说是OutputFormat不会参与cp, 所以at-least-once都不一定能保证。 > > 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗? > > 祝好, > Leonard > > > > 在 2020年12月10日,11:22,jie mei <[hidden email]> 写道: > > Hi,Jark > > 好的,我会就此创建一个issue > > Jark Wu <[hidden email]> 于2020年12月10日周四 上午11:17写道: > >> Hi Jie, >> >> 看起来确实是个问题。 >> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 >> 可以帮忙创建个 issue 么? >> >> Best, >> Jark >> >> On Thu, 10 Dec 2020 at 02:05, hailongwang <[hidden email]> wrote: >> >> > Hi, >> > 是的,感觉你是对的。 >> > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 >> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 >> snapshotState >> > 时候调用format.flush。 >> > WDYT @Jark @ Leonard >> > >> > Best, >> > Hailong >> > >> > >> > 在 2020-12-09 17:13:14,"jie mei" <[hidden email]> 写道: >> > >Hi, Community >> > > >> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 >> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 >> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 >> > > >> > >我的问题是:是否有办法强制刷新buffer中的数据入库? >> > > >> > > >> > >@Public >> > >public interface OutputFormat<IT> extends Serializable { >> > > >> > > /** >> > > * Configures this output format. Since output formats are >> > >instantiated generically and hence parameterless, >> > > * this method is the place where the output formats set their >> > >basic fields based on configuration values. >> > > * <p> >> > > * This method is always called first on a newly instantiated >> output format. >> > > * >> > > * @param parameters The configuration with all parameters. >> > > */ >> > > void configure(Configuration parameters); >> > > >> > > /** >> > > * Opens a parallel instance of the output format to store the >> > >result of its parallel instance. >> > > * <p> >> > > * When this method is called, the output format it guaranteed to >> > >be configured. >> > > * >> > > * @param taskNumber The number of the parallel instance. >> > > * @param numTasks The number of parallel tasks. >> > > * @throws IOException Thrown, if the output could not be opened >> > >due to an I/O problem. >> > > */ >> > > void open(int taskNumber, int numTasks) throws IOException; >> > > >> > > >> > > /** >> > > * Adds a record to the output. >> > > * <p> >> > > * When this method is called, the output format it guaranteed to >> be opened. >> > > * >> > > * @param record The records to add to the output. >> > > * @throws IOException Thrown, if the records could not be added to >> > >to an I/O problem. >> > > */ >> > > void writeRecord(IT record) throws IOException; >> > > >> > > /** >> > > * Method that marks the end of the life-cycle of parallel output >> > >instance. Should be used to close >> > > * channels and streams and release resources. >> > > * After this method returns without an error, the output is >> > >assumed to be correct. >> > > * <p> >> > > * When this method is called, the output format it guaranteed to >> be opened. >> > > * >> > > * @throws IOException Thrown, if the input could not be closed >> properly. >> > > */ >> > > void close() throws IOException; >> > >} >> > > >> > > >> > >-- >> > > >> > >*Best Regards* >> > >*Jeremy Mei* >> > >> > >> > >> > >> > >> > >> > > > -- > > *Best Regards* > *Jeremy Mei* > > > -- *Best Regards* *Jeremy Mei* |
Free forum by Nabble | Edit this page |