flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

jie mei
Hi, Community

JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。

我的问题是:是否有办法强制刷新buffer中的数据入库?


@Public
public interface OutputFormat<IT> extends Serializable {

   /**
    * Configures this output format. Since output formats are
instantiated generically and hence parameterless,
    * this method is the place where the output formats set their
basic fields based on configuration values.
    * <p>
    * This method is always called first on a newly instantiated output format.
    *
    * @param parameters The configuration with all parameters.
    */
   void configure(Configuration parameters);

   /**
    * Opens a parallel instance of the output format to store the
result of its parallel instance.
    * <p>
    * When this method is called, the output format it guaranteed to
be configured.
    *
    * @param taskNumber The number of the parallel instance.
    * @param numTasks The number of parallel tasks.
    * @throws IOException Thrown, if the output could not be opened
due to an I/O problem.
    */
   void open(int taskNumber, int numTasks) throws IOException;


   /**
    * Adds a record to the output.
    * <p>
    * When this method is called, the output format it guaranteed to be opened.
    *
    * @param record The records to add to the output.
    * @throws IOException Thrown, if the records could not be added to
to an I/O problem.
    */
   void writeRecord(IT record) throws IOException;

   /**
    * Method that marks the end of the life-cycle of parallel output
instance. Should be used to close
    * channels and streams and release resources.
    * After this method returns without an error, the output is
assumed to be correct.
    * <p>
    * When this method is called, the output format it guaranteed to be opened.
    *
    * @throws IOException Thrown, if the input could not be closed properly.
    */
   void close() throws IOException;
}


--

*Best Regards*
*Jeremy Mei*
Reply | Threaded
Open this post in threaded view
|

Re:flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

hailongwang
Hi,

   是的,感觉你是对的。
  `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而  `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState 时候调用format.flush。
   WDYT @Jark @ Leonard


Best,
Hailong

在 2020-12-09 17:13:14,"jie mei" <[hidden email]> 写道:

>Hi, Community
>
>JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
>OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
>代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
>
>我的问题是:是否有办法强制刷新buffer中的数据入库?
>
>
>@Public
>public interface OutputFormat<IT> extends Serializable {
>
>   /**
>    * Configures this output format. Since output formats are
>instantiated generically and hence parameterless,
>    * this method is the place where the output formats set their
>basic fields based on configuration values.
>    * <p>
>    * This method is always called first on a newly instantiated output format.
>    *
>    * @param parameters The configuration with all parameters.
>    */
>   void configure(Configuration parameters);
>
>   /**
>    * Opens a parallel instance of the output format to store the
>result of its parallel instance.
>    * <p>
>    * When this method is called, the output format it guaranteed to
>be configured.
>    *
>    * @param taskNumber The number of the parallel instance.
>    * @param numTasks The number of parallel tasks.
>    * @throws IOException Thrown, if the output could not be opened
>due to an I/O problem.
>    */
>   void open(int taskNumber, int numTasks) throws IOException;
>
>
>   /**
>    * Adds a record to the output.
>    * <p>
>    * When this method is called, the output format it guaranteed to be opened.
>    *
>    * @param record The records to add to the output.
>    * @throws IOException Thrown, if the records could not be added to
>to an I/O problem.
>    */
>   void writeRecord(IT record) throws IOException;
>
>   /**
>    * Method that marks the end of the life-cycle of parallel output
>instance. Should be used to close
>    * channels and streams and release resources.
>    * After this method returns without an error, the output is
>assumed to be correct.
>    * <p>
>    * When this method is called, the output format it guaranteed to be opened.
>    *
>    * @throws IOException Thrown, if the input could not be closed properly.
>    */
>   void close() throws IOException;
>}
>
>
>--
>
>*Best Regards*
>*Jeremy Mei*
Reply | Threaded
Open this post in threaded view
|

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

Jark
Administrator
Hi Jie,

看起来确实是个问题。
sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
可以帮忙创建个 issue 么?

Best,
Jark

On Thu, 10 Dec 2020 at 02:05, hailongwang <[hidden email]> wrote:

> Hi,
>    是的,感觉你是对的。
>   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> 时候调用format.flush。
>    WDYT @Jark @ Leonard
>
> Best,
> Hailong
>
>
> 在 2020-12-09 17:13:14,"jie mei" <[hidden email]> 写道:
> >Hi, Community
> >
> >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> >
> >我的问题是:是否有办法强制刷新buffer中的数据入库?
> >
> >
> >@Public
> >public interface OutputFormat<IT> extends Serializable {
> >
> >   /**
> >    * Configures this output format. Since output formats are
> >instantiated generically and hence parameterless,
> >    * this method is the place where the output formats set their
> >basic fields based on configuration values.
> >    * <p>
> >    * This method is always called first on a newly instantiated output format.
> >    *
> >    * @param parameters The configuration with all parameters.
> >    */
> >   void configure(Configuration parameters);
> >
> >   /**
> >    * Opens a parallel instance of the output format to store the
> >result of its parallel instance.
> >    * <p>
> >    * When this method is called, the output format it guaranteed to
> >be configured.
> >    *
> >    * @param taskNumber The number of the parallel instance.
> >    * @param numTasks The number of parallel tasks.
> >    * @throws IOException Thrown, if the output could not be opened
> >due to an I/O problem.
> >    */
> >   void open(int taskNumber, int numTasks) throws IOException;
> >
> >
> >   /**
> >    * Adds a record to the output.
> >    * <p>
> >    * When this method is called, the output format it guaranteed to be opened.
> >    *
> >    * @param record The records to add to the output.
> >    * @throws IOException Thrown, if the records could not be added to
> >to an I/O problem.
> >    */
> >   void writeRecord(IT record) throws IOException;
> >
> >   /**
> >    * Method that marks the end of the life-cycle of parallel output
> >instance. Should be used to close
> >    * channels and streams and release resources.
> >    * After this method returns without an error, the output is
> >assumed to be correct.
> >    * <p>
> >    * When this method is called, the output format it guaranteed to be opened.
> >    *
> >    * @throws IOException Thrown, if the input could not be closed properly.
> >    */
> >   void close() throws IOException;
> >}
> >
> >
> >--
> >
> >*Best Regards*
> >*Jeremy Mei*
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

jie mei
Hi,Jark

好的,我会就此创建一个issue

Jark Wu <[hidden email]> 于2020年12月10日周四 上午11:17写道:

> Hi Jie,
>
> 看起来确实是个问题。
> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
> 可以帮忙创建个 issue 么?
>
> Best,
> Jark
>
> On Thu, 10 Dec 2020 at 02:05, hailongwang <[hidden email]> wrote:
>
> > Hi,
> >    是的,感觉你是对的。
> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在
> snapshotState
> > 时候调用format.flush。
> >    WDYT @Jark @ Leonard
> >
> > Best,
> > Hailong
> >
> >
> > 在 2020-12-09 17:13:14,"jie mei" <[hidden email]> 写道:
> > >Hi, Community
> > >
> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> > >
> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
> > >
> > >
> > >@Public
> > >public interface OutputFormat<IT> extends Serializable {
> > >
> > >   /**
> > >    * Configures this output format. Since output formats are
> > >instantiated generically and hence parameterless,
> > >    * this method is the place where the output formats set their
> > >basic fields based on configuration values.
> > >    * <p>
> > >    * This method is always called first on a newly instantiated output
> format.
> > >    *
> > >    * @param parameters The configuration with all parameters.
> > >    */
> > >   void configure(Configuration parameters);
> > >
> > >   /**
> > >    * Opens a parallel instance of the output format to store the
> > >result of its parallel instance.
> > >    * <p>
> > >    * When this method is called, the output format it guaranteed to
> > >be configured.
> > >    *
> > >    * @param taskNumber The number of the parallel instance.
> > >    * @param numTasks The number of parallel tasks.
> > >    * @throws IOException Thrown, if the output could not be opened
> > >due to an I/O problem.
> > >    */
> > >   void open(int taskNumber, int numTasks) throws IOException;
> > >
> > >
> > >   /**
> > >    * Adds a record to the output.
> > >    * <p>
> > >    * When this method is called, the output format it guaranteed to be
> opened.
> > >    *
> > >    * @param record The records to add to the output.
> > >    * @throws IOException Thrown, if the records could not be added to
> > >to an I/O problem.
> > >    */
> > >   void writeRecord(IT record) throws IOException;
> > >
> > >   /**
> > >    * Method that marks the end of the life-cycle of parallel output
> > >instance. Should be used to close
> > >    * channels and streams and release resources.
> > >    * After this method returns without an error, the output is
> > >assumed to be correct.
> > >    * <p>
> > >    * When this method is called, the output format it guaranteed to be
> opened.
> > >    *
> > >    * @throws IOException Thrown, if the input could not be closed
> properly.
> > >    */
> > >   void close() throws IOException;
> > >}
> > >
> > >
> > >--
> > >
> > >*Best Regards*
> > >*Jeremy Mei*
> >
> >
> >
> >
> >
> >
>


--

*Best Regards*
*Jeremy Mei*
Reply | Threaded
Open this post in threaded view
|

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

Leonard Xu
你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库,
也可以说是OutputFormat不会参与cp,  所以at-least-once都不一定能保证。

修复应该很简单的,@jie mei 你有兴趣帮忙修复吗?

祝好,
Leonard



> 在 2020年12月10日,11:22,jie mei <[hidden email]> 写道:
>
> Hi,Jark
>
> 好的,我会就此创建一个issue
>
> Jark Wu <[hidden email] <mailto:[hidden email]>> 于2020年12月10日周四 上午11:17写道:
> Hi Jie,
>
> 看起来确实是个问题。
> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
> 可以帮忙创建个 issue 么?
>
> Best,
> Jark
>
> On Thu, 10 Dec 2020 at 02:05, hailongwang <[hidden email] <mailto:[hidden email]>> wrote:
>
> > Hi,
> >    是的,感觉你是对的。
> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> > 时候调用format.flush。
> >    WDYT @Jark @ Leonard
> >
> > Best,
> > Hailong
> >
> >
> > 在 2020-12-09 17:13:14,"jie mei" <[hidden email] <mailto:[hidden email]>> 写道:
> > >Hi, Community
> > >
> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> > >
> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
> > >
> > >
> > >@Public
> > >public interface OutputFormat<IT> extends Serializable {
> > >
> > >   /**
> > >    * Configures this output format. Since output formats are
> > >instantiated generically and hence parameterless,
> > >    * this method is the place where the output formats set their
> > >basic fields based on configuration values.
> > >    * <p>
> > >    * This method is always called first on a newly instantiated output format.
> > >    *
> > >    * @param parameters The configuration with all parameters.
> > >    */
> > >   void configure(Configuration parameters);
> > >
> > >   /**
> > >    * Opens a parallel instance of the output format to store the
> > >result of its parallel instance.
> > >    * <p>
> > >    * When this method is called, the output format it guaranteed to
> > >be configured.
> > >    *
> > >    * @param taskNumber The number of the parallel instance.
> > >    * @param numTasks The number of parallel tasks.
> > >    * @throws IOException Thrown, if the output could not be opened
> > >due to an I/O problem.
> > >    */
> > >   void open(int taskNumber, int numTasks) throws IOException;
> > >
> > >
> > >   /**
> > >    * Adds a record to the output.
> > >    * <p>
> > >    * When this method is called, the output format it guaranteed to be opened.
> > >    *
> > >    * @param record The records to add to the output.
> > >    * @throws IOException Thrown, if the records could not be added to
> > >to an I/O problem.
> > >    */
> > >   void writeRecord(IT record) throws IOException;
> > >
> > >   /**
> > >    * Method that marks the end of the life-cycle of parallel output
> > >instance. Should be used to close
> > >    * channels and streams and release resources.
> > >    * After this method returns without an error, the output is
> > >assumed to be correct.
> > >    * <p>
> > >    * When this method is called, the output format it guaranteed to be opened.
> > >    *
> > >    * @throws IOException Thrown, if the input could not be closed properly.
> > >    */
> > >   void close() throws IOException;
> > >}
> > >
> > >
> > >--
> > >
> > >*Best Regards*
> > >*Jeremy Mei*
> >
> >
> >
> >
> >
> >
>
>
> --
>
> Best Regards
> Jeremy Mei

Reply | Threaded
Open this post in threaded view
|

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

jie mei
Hi, Leonard


好的,我将会提一个PR来修复这个issue


Leonard Xu <[hidden email]> 于2020年12月10日周四 下午12:10写道:

> 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider,
> 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为
> OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库,
> 也可以说是OutputFormat不会参与cp,  所以at-least-once都不一定能保证。
>
> 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗?
>
> 祝好,
> Leonard
>
>
>
> 在 2020年12月10日,11:22,jie mei <[hidden email]> 写道:
>
> Hi,Jark
>
> 好的,我会就此创建一个issue
>
> Jark Wu <[hidden email]> 于2020年12月10日周四 上午11:17写道:
>
>> Hi Jie,
>>
>> 看起来确实是个问题。
>> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
>> 可以帮忙创建个 issue 么?
>>
>> Best,
>> Jark
>>
>> On Thu, 10 Dec 2020 at 02:05, hailongwang <[hidden email]> wrote:
>>
>> > Hi,
>> >    是的,感觉你是对的。
>> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
>> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在
>> snapshotState
>> > 时候调用format.flush。
>> >    WDYT @Jark @ Leonard
>> >
>> > Best,
>> > Hailong
>> >
>> >
>> > 在 2020-12-09 17:13:14,"jie mei" <[hidden email]> 写道:
>> > >Hi, Community
>> > >
>> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
>> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
>> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
>> > >
>> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
>> > >
>> > >
>> > >@Public
>> > >public interface OutputFormat<IT> extends Serializable {
>> > >
>> > >   /**
>> > >    * Configures this output format. Since output formats are
>> > >instantiated generically and hence parameterless,
>> > >    * this method is the place where the output formats set their
>> > >basic fields based on configuration values.
>> > >    * <p>
>> > >    * This method is always called first on a newly instantiated
>> output format.
>> > >    *
>> > >    * @param parameters The configuration with all parameters.
>> > >    */
>> > >   void configure(Configuration parameters);
>> > >
>> > >   /**
>> > >    * Opens a parallel instance of the output format to store the
>> > >result of its parallel instance.
>> > >    * <p>
>> > >    * When this method is called, the output format it guaranteed to
>> > >be configured.
>> > >    *
>> > >    * @param taskNumber The number of the parallel instance.
>> > >    * @param numTasks The number of parallel tasks.
>> > >    * @throws IOException Thrown, if the output could not be opened
>> > >due to an I/O problem.
>> > >    */
>> > >   void open(int taskNumber, int numTasks) throws IOException;
>> > >
>> > >
>> > >   /**
>> > >    * Adds a record to the output.
>> > >    * <p>
>> > >    * When this method is called, the output format it guaranteed to
>> be opened.
>> > >    *
>> > >    * @param record The records to add to the output.
>> > >    * @throws IOException Thrown, if the records could not be added to
>> > >to an I/O problem.
>> > >    */
>> > >   void writeRecord(IT record) throws IOException;
>> > >
>> > >   /**
>> > >    * Method that marks the end of the life-cycle of parallel output
>> > >instance. Should be used to close
>> > >    * channels and streams and release resources.
>> > >    * After this method returns without an error, the output is
>> > >assumed to be correct.
>> > >    * <p>
>> > >    * When this method is called, the output format it guaranteed to
>> be opened.
>> > >    *
>> > >    * @throws IOException Thrown, if the input could not be closed
>> properly.
>> > >    */
>> > >   void close() throws IOException;
>> > >}
>> > >
>> > >
>> > >--
>> > >
>> > >*Best Regards*
>> > >*Jeremy Mei*
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>
>
>

--

*Best Regards*
*Jeremy Mei*