我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This connection has been closed. at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 //JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception { checkFlushException(); for (int i = 1; i <= maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行 JDBCUpsertOutputFormat.flush -> AppendOnlyWriter.executeBatch ... -> PgConnection.getAutoCommit 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空 // PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException { // Construct query/parameter arrays. transformQueriesAndParameters(); // Empty arrays should be passed to toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if (connection.getAutoCommit()) { // 抛出异常 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行? // PgStatement.java public int[] executeBatch() throws SQLException { checkClosed(); closeForNextExecution(); if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return internalExecuteBatch().getUpdateCount(); } 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue https://issues.apache.org/jira/browse/FLINK-16708 |
Hi,
是否[1]能解决你的问题呢?还是说需要单独判断不同的exceptions? [1] https://issues.apache.org/jira/browse/FLINK-16281 Best, Jingsong Lee On Mon, Mar 23, 2020 at 2:34 PM lucas.wu <[hidden email]> wrote: > hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc > connection进行初始化,当jdbc > conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat > 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。 > > > 原始邮件 > 发件人:[hidden email] > 收件人:[hidden email] > 抄送:[hidden email] > 发送时间:2020年3月23日(周一) 11:05 > 主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效 > > > 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志 > 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC > executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This > connection has been closed. at > org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at > org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 //JDBCUpsertOutputFormat.javapublic > synchronized void flush() throws Exception { checkFlushException(); for > (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); > batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC > executeBatch error, retry times = {}", i, e); if (i gt;= maxRetryTimes) { > throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行 > JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch nbsp; > nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit > 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空 > // PgStatement.java private BatchResultHandler internalExecuteBatch() > throws SQLException { // Construct query/parameter arrays. > transformQueriesAndParameters(); // Empty arrays should be passed to > toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ > Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] > parameterLists = batchParameters.toArray(new ParameterList[0]); > batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if > (connection.getAutoCommit()) { // 抛出异常 flags |= > QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } > 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行? > // PgStatement.java public int[] executeBatch() throws SQLException { > checkClosed(); closeForNextExecution(); if (batchStatements == null || > batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return > internalExecuteBatch().getUpdateCount(); } > 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue > https://issues.apache.org/jira/browse/FLINK-16708 -- Best, Jingsong Lee |
hi,我简单的看了下你提供的issue,这个应该可以解决如果connection被关闭,第二次执行被误认为正常的问题,另外还有一个问题是,假设我们的connection被关闭了,即使你重试三次,好像也是不能正常的执行,这里的重试如果加上判断,如果connection被关闭,则进行重新尝试连接,直到三次都异常才退出,这样会不会更好点。
------------------ 原始邮件 ------------------ 发件人: "Jingsong Li"<[hidden email]>; 发送时间: 2020年3月23日(星期一) 下午3:19 收件人: "user-zh"<[hidden email]>; 主题: Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效 Hi, 是否[1]能解决你的问题呢?还是说需要单独判断不同的exceptions? [1] https://issues.apache.org/jira/browse/FLINK-16281 Best, Jingsong Lee On Mon, Mar 23, 2020 at 2:34 PM lucas.wu <[hidden email]> wrote: > hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc > connection进行初始化,当jdbc > conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat > 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。 > > > 原始邮件 > 发件人:[hidden email] > 收件人:[hidden email] > 抄送:[hidden email] > 发送时间:2020年3月23日(周一) 11:05 > 主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效 > > > 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志 > 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC > executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This > connection has been closed. at > org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at > org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 //JDBCUpsertOutputFormat.javapublic > synchronized void flush() throws Exception { checkFlushException(); for > (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); > batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC > executeBatch error, retry times = {}", i, e); if (i gt;= maxRetryTimes) { > throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行 > JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch nbsp; > nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit > 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空 > // PgStatement.java private BatchResultHandler internalExecuteBatch() > throws SQLException { // Construct query/parameter arrays. > transformQueriesAndParameters(); // Empty arrays should be passed to > toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ > Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] > parameterLists = batchParameters.toArray(new ParameterList[0]); > batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if > (connection.getAutoCommit()) { // 抛出异常 flags |= > QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } > 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行? > // PgStatement.java public int[] executeBatch() throws SQLException { > checkClosed(); closeForNextExecution(); if (batchStatements == null || > batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return > internalExecuteBatch().getUpdateCount(); } > 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue > https://issues.apache.org/jira/browse/FLINK-16708 -- Best, Jingsong Lee |
Hi,
> 加上判断,如果connection被关闭,则进行重新尝试连接 👍,完全可以,合理的需求,可以考虑改下JIRA的描述。(可以贡献那就更好了) Best, Jingsong Lee On Mon, Mar 23, 2020 at 3:46 PM shangwen <[hidden email]> wrote: > > hi,我简单的看了下你提供的issue,这个应该可以解决如果connection被关闭,第二次执行被误认为正常的问题,另外还有一个问题是,假设我们的connection被关闭了,即使你重试三次,好像也是不能正常的执行,这里的重试如果加上判断,如果connection被关闭,则进行重新尝试连接,直到三次都异常才退出,这样会不会更好点。 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Jingsong Li"<[hidden email]>; > 发送时间: 2020年3月23日(星期一) 下午3:19 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效 > > > > Hi, > > 是否[1]能解决你的问题呢?还是说需要单独判断不同的exceptions? > > [1] https://issues.apache.org/jira/browse/FLINK-16281 > > Best, > Jingsong Lee > > On Mon, Mar 23, 2020 at 2:34 PM lucas.wu <[hidden email]> wrote: > > > hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc > > connection进行初始化,当jdbc > > > conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat > > 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。 > > > > > > 原始邮件 > > 发件人:[hidden email] > > 收件人:[hidden email] > > 抄送:[hidden email] > > 发送时间:2020年3月23日(周一) 11:05 > > 主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效 > > > > > > > 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志 > > 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR > > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC > > executeBatch error, retry times = 1 > org.postgresql.util.PSQLException: This > > connection has been closed. at > > org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at > > org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) > at > > > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) > at > > > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > > at org.apache.flink.api.java.io > .jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > > at org.apache.flink.api.java.io > .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > > at org.apache.flink.api.java.io > .jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 > //JDBCUpsertOutputFormat.javapublic > > synchronized void flush() throws Exception { checkFlushException(); > for > > (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); > > batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC > > executeBatch error, retry times = {}", i, e); if (i gt;= > maxRetryTimes) { > > throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行 > > JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch > nbsp; > > nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit > > 抛出PSQLException: This connection has been > closed时,batchStatements在这之前已经被清空 > > // PgStatement.java private BatchResultHandler internalExecuteBatch() > > throws SQLException { // Construct query/parameter arrays. > > transformQueriesAndParameters(); // Empty arrays should be passed to > > toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ > > <http://shipilev.net/blog/2016/arrays-wisdom-ancients/>>; Query[] > queries = batchStatements.toArray(new Query[0]); ParameterList[] > > parameterLists = batchParameters.toArray(new ParameterList[0]); > > batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if > > (connection.getAutoCommit()) { // 抛出异常 flags |= > > QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } > > > 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行? > > // PgStatement.java public int[] executeBatch() throws SQLException { > > checkClosed(); closeForNextExecution(); if (batchStatements == null || > > batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return > > internalExecuteBatch().getUpdateCount(); } > > > 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue > > https://issues.apache.org/jira/browse/FLINK-16708 > > > > -- > Best, Jingsong Lee -- Best, Jingsong Lee |
好的,我会调整下JIRA描述并提交代码,感谢大家回复~
------------------ 原始邮件 ------------------ 发件人: "Jingsong Li"<[hidden email]>; 发送时间: 2020年3月23日(星期一) 下午4:13 收件人: "user-zh"<[hidden email]>; 主题: Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效 Hi, > 加上判断,如果connection被关闭,则进行重新尝试连接 👍,完全可以,合理的需求,可以考虑改下JIRA的描述。(可以贡献那就更好了) Best, Jingsong Lee On Mon, Mar 23, 2020 at 3:46 PM shangwen <[hidden email]> wrote: > > hi,我简单的看了下你提供的issue,这个应该可以解决如果connection被关闭,第二次执行被误认为正常的问题,另外还有一个问题是,假设我们的connection被关闭了,即使你重试三次,好像也是不能正常的执行,这里的重试如果加上判断,如果connection被关闭,则进行重新尝试连接,直到三次都异常才退出,这样会不会更好点。 > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Jingsong Li"<[hidden email]&gt;; > 发送时间:&nbsp;2020年3月23日(星期一) 下午3:19 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效 > > > > Hi, > > 是否[1]能解决你的问题呢?还是说需要单独判断不同的exceptions? > > [1] https://issues.apache.org/jira/browse/FLINK-16281 > > Best, > Jingsong Lee > > On Mon, Mar 23, 2020 at 2:34 PM lucas.wu <[hidden email]&gt; wrote: > > &gt; hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc > &gt; connection进行初始化,当jdbc > &gt; > conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat > &gt; 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。 > &gt; > &gt; > &gt; 原始邮件 > &gt; 发件人:[hidden email] > &gt; 收件人:[hidden email] > &gt; 抄送:[hidden email] > &gt; 发送时间:2020年3月23日(周一) 11:05 > &gt; 主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效 > &gt; > &gt; > &gt; > 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志 > &gt; 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR > &gt; org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC > &gt; executeBatch error, retry times = 1 > org.postgresql.util.PSQLException: This > &gt; connection has been closed. at > &gt; org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at > &gt; org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) > at > &gt; > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > &gt; at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) > at > &gt; > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > &gt; at org.apache.flink.api.java.io > .jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > &gt; at org.apache.flink.api.java.io > .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > &gt; at org.apache.flink.api.java.io > .jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > &gt; at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > &gt; at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at > &gt; > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > &gt; at > &gt; > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > &gt; at > &gt; > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > &gt; at > &gt; > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > &gt; at java.lang.Thread.run(Thread.java:748) > &gt; 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 > //JDBCUpsertOutputFormat.javapublic > &gt; synchronized void flush() throws Exception { checkFlushException(); > for > &gt; (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); > &gt; batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC > &gt; executeBatch error, retry times = {}", i, e); if (i gt;= > maxRetryTimes) { > &gt; throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行 > &gt; JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch > nbsp; > &gt; nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit > &gt; 抛出PSQLException: This connection has been > closed时,batchStatements在这之前已经被清空 > &gt; // PgStatement.java private BatchResultHandler internalExecuteBatch() > &gt; throws SQLException { // Construct query/parameter arrays. > &gt; transformQueriesAndParameters(); // Empty arrays should be passed to > &gt; toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ > &gt <http://shipilev.net/blog/2016/arrays-wisdom-ancients/&gt>; Query[] > queries = batchStatements.toArray(new Query[0]); ParameterList[] > &gt; parameterLists = batchParameters.toArray(new ParameterList[0]); > &gt; batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if > &gt; (connection.getAutoCommit()) { // 抛出异常 flags |= > &gt; QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } > &gt; > 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行? > &gt; // PgStatement.java public int[] executeBatch() throws SQLException { > &gt; checkClosed(); closeForNextExecution(); if (batchStatements == null || > &gt; batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return > &gt; internalExecuteBatch().getUpdateCount(); } > &gt; > 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue > &gt; https://issues.apache.org/jira/browse/FLINK-16708 > > > > -- > Best, Jingsong Lee -- Best, Jingsong Lee |
In reply to this post by shangwen
Hi, shangwen
这应该是AppendOnlyWriter的一个bug[1], 在1.10.1/1.11-SNAPSHOT(master)中已经修复. 用1.10.1或master分支就好了,目前1.10.1还未发布,我了解到的1.10.1社区正在准备发布中。 如果急需修复,你可以参考1.10.1分支的代码。 Best, Leonard [1]https://issues.apache.org/jira/browse/FLINK-16281 <https://issues.apache.org/jira/browse/FLINK-16281> [2]https://github.com/apache/flink/tree/release-1.10 <https://github.com/apache/flink/tree/release-1.10> > 在 2020年3月23日,11:05,shangwen <[hidden email]> 写道: > > 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志 > > > 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch error, retry times = 1 > org.postgresql.util.PSQLException: This connection has been closed. > at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) > at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) > at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) > at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > > 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 > //JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception { > checkFlushException(); > > for (int i = 1; i <= maxRetryTimes; i++) { > try { > jdbcWriter.executeBatch(); > batchCount = 0; > break; > } catch (SQLException e) { > LOG.error("JDBC executeBatch error, retry times = {}", i, e); > if (i >= maxRetryTimes) { > throw e; > } > Thread.sleep(1000 * i); > } > } > } > > > 通过远程debug分析,在第一次执行 > JDBCUpsertOutputFormat.flush > -> AppendOnlyWriter.executeBatch > ... > -> PgConnection.getAutoCommit > 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空 > // PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException { // Construct query/parameter arrays. transformQueriesAndParameters(); // Empty arrays should be passed to toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if (connection.getAutoCommit()) { // 抛出异常 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } > > > 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行? > // PgStatement.java public int[] executeBatch() throws SQLException { checkClosed(); closeForNextExecution(); if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return internalExecuteBatch().getUpdateCount(); } > > > 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue > https://issues.apache.org/jira/browse/FLINK-16708 |
Free forum by Nabble | Edit this page |