Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

shangwen
我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志


2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
        at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
        at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
        at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
        at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
        at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
        at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
        at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
        at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)



从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码
//JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception {
   checkFlushException();

   for (int i = 1; i <= maxRetryTimes; i++) {
      try {
         jdbcWriter.executeBatch();
         batchCount = 0;
         break;
      } catch (SQLException e) {
         LOG.error("JDBC executeBatch error, retry times = {}", i, e);
         if (i &gt;= maxRetryTimes) {
            throw e;
         }
         Thread.sleep(1000 * i);
      }
   }
}


通过远程debug分析,在第一次执行
JDBCUpsertOutputFormat.flush
&nbsp; -&gt; AppendOnlyWriter.executeBatch
&nbsp; &nbsp; &nbsp;...
&nbsp; &nbsp; &nbsp;-&gt; PgConnection.getAutoCommit
抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空
// PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException {   // Construct query/parameter arrays.   transformQueriesAndParameters();   // Empty arrays should be passed to toArray   // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/   Query[] queries = batchStatements.toArray(new Query[0]);   ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]);   batchStatements.clear(); // 这里已经被清空   batchParameters.clear();   ...   if (connection.getAutoCommit()) { // 抛出异常   flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;   }   ... }


所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
// PgStatement.java public int[] executeBatch() throws SQLException {   checkClosed();   closeForNextExecution();   if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了     return new int[0];   }   return internalExecuteBatch().getUpdateCount(); }


目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
https://issues.apache.org/jira/browse/FLINK-16708
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

Jingsong Li
Hi,

是否[1]能解决你的问题呢?还是说需要单独判断不同的exceptions?

[1] https://issues.apache.org/jira/browse/FLINK-16281

Best,
Jingsong Lee

On Mon, Mar 23, 2020 at 2:34 PM lucas.wu <[hidden email]> wrote:

> hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc
> connection进行初始化,当jdbc
> conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat
> 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。
>
>
> 原始邮件
> 发件人:[hidden email]
> 收件人:[hidden email]
> 抄送:[hidden email]
> 发送时间:2020年3月23日(周一) 11:05
> 主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
>
>
> 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
> 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC
> executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This
> connection has been closed. at
> org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at
> org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at
> org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
> at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
> at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 //JDBCUpsertOutputFormat.javapublic
> synchronized void flush() throws Exception { checkFlushException(); for
> (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch();
> batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC
> executeBatch error, retry times = {}", i, e); if (i gt;= maxRetryTimes) {
> throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行
> JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch nbsp;
> nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit
> 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空
> // PgStatement.java private BatchResultHandler internalExecuteBatch()
> throws SQLException { // Construct query/parameter arrays.
> transformQueriesAndParameters(); // Empty arrays should be passed to
> toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/
> Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[]
> parameterLists = batchParameters.toArray(new ParameterList[0]);
> batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if
> (connection.getAutoCommit()) { // 抛出异常 flags |=
> QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... }
> 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
> // PgStatement.java public int[] executeBatch() throws SQLException {
> checkClosed(); closeForNextExecution(); if (batchStatements == null ||
> batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return
> internalExecuteBatch().getUpdateCount(); }
> 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
> https://issues.apache.org/jira/browse/FLINK-16708



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

回复: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

shangwen
hi,我简单的看了下你提供的issue,这个应该可以解决如果connection被关闭,第二次执行被误认为正常的问题,另外还有一个问题是,假设我们的connection被关闭了,即使你重试三次,好像也是不能正常的执行,这里的重试如果加上判断,如果connection被关闭,则进行重新尝试连接,直到三次都异常才退出,这样会不会更好点。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Jingsong Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年3月23日(星期一) 下午3:19
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效



Hi,

是否[1]能解决你的问题呢?还是说需要单独判断不同的exceptions?

[1] https://issues.apache.org/jira/browse/FLINK-16281

Best,
Jingsong Lee

On Mon, Mar 23, 2020 at 2:34 PM lucas.wu <[hidden email]&gt; wrote:

&gt; hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc
&gt; connection进行初始化,当jdbc
&gt; conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat
&gt; 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。
&gt;
&gt;
&gt; 原始邮件
&gt; 发件人:[hidden email]
&gt; 收件人:[hidden email]
&gt; 抄送:[hidden email]
&gt; 发送时间:2020年3月23日(周一) 11:05
&gt; 主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
&gt;
&gt;
&gt; 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
&gt; 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR
&gt; org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC
&gt; executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This
&gt; connection has been closed. at
&gt; org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at
&gt; org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at
&gt; org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
&gt; at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at
&gt; org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
&gt; at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
&gt; at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
&gt; at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
&gt; at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
&gt; at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
&gt; java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
&gt; at
&gt; java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
&gt; at
&gt; java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
&gt; at
&gt; java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
&gt; at java.lang.Thread.run(Thread.java:748)
&gt; 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 //JDBCUpsertOutputFormat.javapublic
&gt; synchronized void flush() throws Exception { checkFlushException(); for
&gt; (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch();
&gt; batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC
&gt; executeBatch error, retry times = {}", i, e); if (i gt;= maxRetryTimes) {
&gt; throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行
&gt; JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch nbsp;
&gt; nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit
&gt; 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空
&gt; // PgStatement.java private BatchResultHandler internalExecuteBatch()
&gt; throws SQLException { // Construct query/parameter arrays.
&gt; transformQueriesAndParameters(); // Empty arrays should be passed to
&gt; toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/
&gt; Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[]
&gt; parameterLists = batchParameters.toArray(new ParameterList[0]);
&gt; batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if
&gt; (connection.getAutoCommit()) { // 抛出异常 flags |=
&gt; QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... }
&gt; 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
&gt; // PgStatement.java public int[] executeBatch() throws SQLException {
&gt; checkClosed(); closeForNextExecution(); if (batchStatements == null ||
&gt; batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return
&gt; internalExecuteBatch().getUpdateCount(); }
&gt; 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
&gt; https://issues.apache.org/jira/browse/FLINK-16708



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

Jingsong Li
 Hi,

> 加上判断,如果connection被关闭,则进行重新尝试连接

👍,完全可以,合理的需求,可以考虑改下JIRA的描述。(可以贡献那就更好了)

Best,
Jingsong Lee

On Mon, Mar 23, 2020 at 3:46 PM shangwen <[hidden email]> wrote:

>
> hi,我简单的看了下你提供的issue,这个应该可以解决如果connection被关闭,第二次执行被误认为正常的问题,另外还有一个问题是,假设我们的connection被关闭了,即使你重试三次,好像也是不能正常的执行,这里的重试如果加上判断,如果connection被关闭,则进行重新尝试连接,直到三次都异常才退出,这样会不会更好点。
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Jingsong Li"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年3月23日(星期一) 下午3:19
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
>
>
>
> Hi,
>
> 是否[1]能解决你的问题呢?还是说需要单独判断不同的exceptions?
>
> [1] https://issues.apache.org/jira/browse/FLINK-16281
>
> Best,
> Jingsong Lee
>
> On Mon, Mar 23, 2020 at 2:34 PM lucas.wu <[hidden email]&gt; wrote:
>
> &gt; hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc
> &gt; connection进行初始化,当jdbc
> &gt;
> conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat
> &gt; 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。
> &gt;
> &gt;
> &gt; 原始邮件
> &gt; 发件人:[hidden email]
> &gt; 收件人:[hidden email]
> &gt; 抄送:[hidden email]
> &gt; 发送时间:2020年3月23日(周一) 11:05
> &gt; 主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
> &gt;
> &gt;
> &gt;
> 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
> &gt; 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR
> &gt; org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC
> &gt; executeBatch error, retry times = 1
> org.postgresql.util.PSQLException: This
> &gt; connection has been closed. at
> &gt; org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at
> &gt; org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
> at
> &gt;
> org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
> &gt; at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
> at
> &gt;
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
> &gt; at org.apache.flink.api.java.io
> .jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
> &gt; at org.apache.flink.api.java.io
> .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> &gt; at org.apache.flink.api.java.io
> .jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
> &gt; at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> &gt; at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
> &gt;
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> &gt; at
> &gt;
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> &gt; at
> &gt;
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> &gt; at
> &gt;
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> &gt; at java.lang.Thread.run(Thread.java:748)
> &gt; 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码
> //JDBCUpsertOutputFormat.javapublic
> &gt; synchronized void flush() throws Exception { checkFlushException();
> for
> &gt; (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch();
> &gt; batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC
> &gt; executeBatch error, retry times = {}", i, e); if (i gt;=
> maxRetryTimes) {
> &gt; throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行
> &gt; JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch
> nbsp;
> &gt; nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit
> &gt; 抛出PSQLException: This connection has been
> closed时,batchStatements在这之前已经被清空
> &gt; // PgStatement.java private BatchResultHandler internalExecuteBatch()
> &gt; throws SQLException { // Construct query/parameter arrays.
> &gt; transformQueriesAndParameters(); // Empty arrays should be passed to
> &gt; toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/
> &gt <http://shipilev.net/blog/2016/arrays-wisdom-ancients/&gt>; Query[]
> queries = batchStatements.toArray(new Query[0]); ParameterList[]
> &gt; parameterLists = batchParameters.toArray(new ParameterList[0]);
> &gt; batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if
> &gt; (connection.getAutoCommit()) { // 抛出异常 flags |=
> &gt; QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... }
> &gt;
> 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
> &gt; // PgStatement.java public int[] executeBatch() throws SQLException {
> &gt; checkClosed(); closeForNextExecution(); if (batchStatements == null ||
> &gt; batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return
> &gt; internalExecuteBatch().getUpdateCount(); }
> &gt;
> 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
> &gt; https://issues.apache.org/jira/browse/FLINK-16708
>
>
>
> --
> Best, Jingsong Lee



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

回复: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

shangwen
好的,我会调整下JIRA描述并提交代码,感谢大家回复~




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Jingsong Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年3月23日(星期一) 下午4:13
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效



 Hi,

&gt; 加上判断,如果connection被关闭,则进行重新尝试连接

👍,完全可以,合理的需求,可以考虑改下JIRA的描述。(可以贡献那就更好了)

Best,
Jingsong Lee

On Mon, Mar 23, 2020 at 3:46 PM shangwen <[hidden email]&gt; wrote:

&gt;
&gt; hi,我简单的看了下你提供的issue,这个应该可以解决如果connection被关闭,第二次执行被误认为正常的问题,另外还有一个问题是,假设我们的connection被关闭了,即使你重试三次,好像也是不能正常的执行,这里的重试如果加上判断,如果connection被关闭,则进行重新尝试连接,直到三次都异常才退出,这样会不会更好点。
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Jingsong Li"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年3月23日(星期一) 下午3:19
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
&gt;
&gt;
&gt;
&gt; Hi,
&gt;
&gt; 是否[1]能解决你的问题呢?还是说需要单独判断不同的exceptions?
&gt;
&gt; [1] https://issues.apache.org/jira/browse/FLINK-16281
&gt;
&gt; Best,
&gt; Jingsong Lee
&gt;
&gt; On Mon, Mar 23, 2020 at 2:34 PM lucas.wu <[hidden email]&amp;gt; wrote:
&gt;
&gt; &amp;gt; hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc
&gt; &amp;gt; connection进行初始化,当jdbc
&gt; &amp;gt;
&gt; conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat
&gt; &amp;gt; 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 原始邮件
&gt; &amp;gt; 发件人:[hidden email]
&gt; &amp;gt; 收件人:[hidden email]
&gt; &amp;gt; 抄送:[hidden email]
&gt; &amp;gt; 发送时间:2020年3月23日(周一) 11:05
&gt; &amp;gt; 主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
&gt; &amp;gt; 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR
&gt; &amp;gt; org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC
&gt; &amp;gt; executeBatch error, retry times = 1
&gt; org.postgresql.util.PSQLException: This
&gt; &amp;gt; connection has been closed. at
&gt; &amp;gt; org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at
&gt; &amp;gt; org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
&gt; at
&gt; &amp;gt;
&gt; org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
&gt; &amp;gt; at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
&gt; at
&gt; &amp;gt;
&gt; org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
&gt; &amp;gt; at org.apache.flink.api.java.io
&gt; .jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
&gt; &amp;gt; at org.apache.flink.api.java.io
&gt; .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
&gt; &amp;gt; at org.apache.flink.api.java.io
&gt; .jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
&gt; &amp;gt; at
&gt; java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
&gt; &amp;gt; at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
&gt; &amp;gt;
&gt; java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
&gt; &amp;gt; at
&gt; &amp;gt;
&gt; java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
&gt; &amp;gt; at
&gt; &amp;gt;
&gt; java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
&gt; &amp;gt; at
&gt; &amp;gt;
&gt; java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
&gt; &amp;gt; at java.lang.Thread.run(Thread.java:748)
&gt; &amp;gt; 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码
&gt; //JDBCUpsertOutputFormat.javapublic
&gt; &amp;gt; synchronized void flush() throws Exception { checkFlushException();
&gt; for
&gt; &amp;gt; (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch();
&gt; &amp;gt; batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC
&gt; &amp;gt; executeBatch error, retry times = {}", i, e); if (i gt;=
&gt; maxRetryTimes) {
&gt; &amp;gt; throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行
&gt; &amp;gt; JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch
&gt; nbsp;
&gt; &amp;gt; nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit
&gt; &amp;gt; 抛出PSQLException: This connection has been
&gt; closed时,batchStatements在这之前已经被清空
&gt; &amp;gt; // PgStatement.java private BatchResultHandler internalExecuteBatch()
&gt; &amp;gt; throws SQLException { // Construct query/parameter arrays.
&gt; &amp;gt; transformQueriesAndParameters(); // Empty arrays should be passed to
&gt; &amp;gt; toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/
&gt; &amp;gt <http://shipilev.net/blog/2016/arrays-wisdom-ancients/&amp;gt&gt;; Query[]
&gt; queries = batchStatements.toArray(new Query[0]); ParameterList[]
&gt; &amp;gt; parameterLists = batchParameters.toArray(new ParameterList[0]);
&gt; &amp;gt; batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if
&gt; &amp;gt; (connection.getAutoCommit()) { // 抛出异常 flags |=
&gt; &amp;gt; QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... }
&gt; &amp;gt;
&gt; 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
&gt; &amp;gt; // PgStatement.java public int[] executeBatch() throws SQLException {
&gt; &amp;gt; checkClosed(); closeForNextExecution(); if (batchStatements == null ||
&gt; &amp;gt; batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return
&gt; &amp;gt; internalExecuteBatch().getUpdateCount(); }
&gt; &amp;gt;
&gt; 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
&gt; &amp;gt; https://issues.apache.org/jira/browse/FLINK-16708
&gt;
&gt;
&gt;
&gt; --
&gt; Best, Jingsong Lee



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

Leonard Xu
In reply to this post by shangwen
Hi, shangwen

这应该是AppendOnlyWriter的一个bug[1], 在1.10.1/1.11-SNAPSHOT(master)中已经修复.
用1.10.1或master分支就好了,目前1.10.1还未发布,我了解到的1.10.1社区正在准备发布中。
如果急需修复,你可以参考1.10.1分支的代码。

Best,
Leonard

[1]https://issues.apache.org/jira/browse/FLINK-16281 <https://issues.apache.org/jira/browse/FLINK-16281>
[2]https://github.com/apache/flink/tree/release-1.10 <https://github.com/apache/flink/tree/release-1.10>

> 在 2020年3月23日,11:05,shangwen <[hidden email]> 写道:
>
> 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
>
>
> 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat&nbsp; - JDBC executeBatch error, retry times = 1
> org.postgresql.util.PSQLException: This connection has been closed.
> at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
> at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
> at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
> at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
> at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
> at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码
> //JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception {
>   checkFlushException();
>
>   for (int i = 1; i <= maxRetryTimes; i++) {
>      try {
>         jdbcWriter.executeBatch();
>         batchCount = 0;
>         break;
>      } catch (SQLException e) {
>         LOG.error("JDBC executeBatch error, retry times = {}", i, e);
>         if (i &gt;= maxRetryTimes) {
>            throw e;
>         }
>         Thread.sleep(1000 * i);
>      }
>   }
> }
>
>
> 通过远程debug分析,在第一次执行
> JDBCUpsertOutputFormat.flush
> &nbsp; -&gt; AppendOnlyWriter.executeBatch
> &nbsp; &nbsp; &nbsp;...
> &nbsp; &nbsp; &nbsp;-&gt; PgConnection.getAutoCommit
> 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空
> // PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException {   // Construct query/parameter arrays.   transformQueriesAndParameters();   // Empty arrays should be passed to toArray   // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/   Query[] queries = batchStatements.toArray(new Query[0]);   ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]);   batchStatements.clear(); // 这里已经被清空   batchParameters.clear();   ...   if (connection.getAutoCommit()) { // 抛出异常   flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;   }   ... }
>
>
> 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
> // PgStatement.java public int[] executeBatch() throws SQLException {   checkClosed();   closeForNextExecution();   if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了     return new int[0];   }   return internalExecuteBatch().getUpdateCount(); }
>
>
> 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
> https://issues.apache.org/jira/browse/FLINK-16708