mysql sink connection timeout

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

mysql sink connection timeout

shizk233
Hi All,
最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
timeout限制(默认的8小时)导致连接失效。
即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。

版本信息:
flink 1.10.1
mysql server 5.6.47
mysql Connector/J 5.1.49

请问:
1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
3.在当前版本下,连接失效有什么比较好的解决方案吗?

Thanks,
Xuhui Mao

异常信息:
2020-06-24 22:39:46,923 ERROR
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat      - JDBC
executeBatch error, retry times = 1
java.sql.SQLException: Could not retrieve transaction read-only status from
server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
at
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
at
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
Communications link failure

The last packet successfully received from the server was 10,384,059
milliseconds ago.  The last packet sent successfully to the server was
10,384,063 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
... 13 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
... 19 more
Reply | Threaded
Open this post in threaded view
|

Re:mysql sink connection timeout

13122260573@163.com
可以参考这个jira
https://issues.apache.org/jira/browse/FLINK-12494
1. Throw execption and let flink runtime handle it;
2. Handle it in OutputFormat;


| |
Zhonghan Tang
|
|
[hidden email]
|
签名由网易邮箱大师定制


On 06/30/2020 11:53,shizk233<[hidden email]> wrote:
Hi All,
最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
timeout限制(默认的8小时)导致连接失效。
即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。

版本信息:
flink 1.10.1
mysql server 5.6.47
mysql Connector/J 5.1.49

请问:
1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
3.在当前版本下,连接失效有什么比较好的解决方案吗?

Thanks,
Xuhui Mao

异常信息:
2020-06-24 22:39:46,923 ERROR
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat      - JDBC
executeBatch error, retry times = 1
java.sql.SQLException: Could not retrieve transaction read-only status from
server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
at
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
at
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
Communications link failure

The last packet successfully received from the server was 10,384,059
milliseconds ago.  The last packet sent successfully to the server was
10,384,063 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
... 13 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
... 19 more
Reply | Threaded
Open this post in threaded view
|

Re: mysql sink connection timeout

shizk233
Hi Zhong Tang,

   我查看了该jira有关的重连pr,https://github.com/apache/flink/pull/8429
,但该pr主要通过重连机制来规避网络问题导致的连接失效,
 但由于我的业务场景数据比较稀疏,遭遇wait timeout连接失效是一个常见的情况,有最大次数限制的重连机制并不是很适合。

主要的需求其实是connection的持久保活。

Thanks,
Xuhui Mao

Zhonghan Tang <[hidden email]> 于2020年6月30日周二 下午12:05写道:

> 可以参考这个jira
> https://issues.apache.org/jira/browse/FLINK-12494
> 1. Throw execption and let flink runtime handle it;
> 2. Handle it in OutputFormat;
>
>
> | |
> Zhonghan Tang
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
>
>
> On 06/30/2020 11:53,shizk233<[hidden email]> wrote:
> Hi All,
> 最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
> timeout限制(默认的8小时)导致连接失效。
> 即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。
>
> 版本信息:
> flink 1.10.1
> mysql server 5.6.47
> mysql Connector/J 5.1.49
>
> 请问:
> 1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
> 2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
> 3.在当前版本下,连接失效有什么比较好的解决方案吗?
>
> Thanks,
> Xuhui Mao
>
> 异常信息:
> 2020-06-24 22:39:46,923 ERROR
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat      - JDBC
> executeBatch error, retry times = 1
> java.sql.SQLException: Could not retrieve transaction read-only status from
> server
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
> at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
> at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
> at
>
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
> at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
> at
> org.apache.flink.api.java.io
> .jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
> at
> org.apache.flink.api.java.io
> .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
> at
> org.apache.flink.api.java.io
> .jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
> Communications link failure
>
> The last packet successfully received from the server was 10,384,059
> milliseconds ago.  The last packet sent successfully to the server was
> 10,384,063 milliseconds ago.
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
> at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
> at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
> at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
> at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
> ... 13 more
> Caused by: java.net.SocketException: Connection reset
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
> ... 19 more
>
Reply | Threaded
Open this post in threaded view
|

Re: mysql sink connection timeout

LakeShen
Hi shizk233,

可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681。

这个就是长时间没有数据,导致 connection 断开问题。

Best,
LakeShen

shizk233 <[hidden email]> 于2020年6月30日周二 下午1:34写道:

> Hi Zhong Tang,
>
>    我查看了该jira有关的重连pr,https://github.com/apache/flink/pull/8429
> ,但该pr主要通过重连机制来规避网络问题导致的连接失效,
>  但由于我的业务场景数据比较稀疏,遭遇wait timeout连接失效是一个常见的情况,有最大次数限制的重连机制并不是很适合。
>
> 主要的需求其实是connection的持久保活。
>
> Thanks,
> Xuhui Mao
>
> Zhonghan Tang <[hidden email]> 于2020年6月30日周二 下午12:05写道:
>
> > 可以参考这个jira
> > https://issues.apache.org/jira/browse/FLINK-12494
> > 1. Throw execption and let flink runtime handle it;
> > 2. Handle it in OutputFormat;
> >
> >
> > | |
> > Zhonghan Tang
> > |
> > |
> > [hidden email]
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > On 06/30/2020 11:53,shizk233<[hidden email]> wrote:
> > Hi All,
> > 最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
> > timeout限制(默认的8小时)导致连接失效。
> > 即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。
> >
> > 版本信息:
> > flink 1.10.1
> > mysql server 5.6.47
> > mysql Connector/J 5.1.49
> >
> > 请问:
> > 1.flink的jdbc
> connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
> > 2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
> > 3.在当前版本下,连接失效有什么比较好的解决方案吗?
> >
> > Thanks,
> > Xuhui Mao
> >
> > 异常信息:
> > 2020-06-24 22:39:46,923 ERROR
> > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat      - JDBC
> > executeBatch error, retry times = 1
> > java.sql.SQLException: Could not retrieve transaction read-only status
> from
> > server
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
> > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
> > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
> > at
> >
> >
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
> > at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
> > at
> > org.apache.flink.api.java.io
> > .jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
> > at
> > org.apache.flink.api.java.io
> > .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
> > at
> > org.apache.flink.api.java.io
> >
> .jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
> > Communications link failure
> >
> > The last packet successfully received from the server was 10,384,059
> > milliseconds ago.  The last packet sent successfully to the server was
> > 10,384,063 milliseconds ago.
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> > at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
> > at
> com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
> > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
> > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
> > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
> > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
> > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
> > at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
> > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
> > ... 13 more
> > Caused by: java.net.SocketException: Connection reset
> > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
> > at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> > at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
> > ... 19 more
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: mysql sink connection timeout

shizk233
Hi LakeShen,

感谢!就是这个!我这就去试一下


Thanks,
Xuhui Mao

LakeShen <[hidden email]> 于2020年6月30日周二 下午2:06写道:

> Hi shizk233,
>
> 可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681。
>
> 这个就是长时间没有数据,导致 connection 断开问题。
>
> Best,
> LakeShen
>
> shizk233 <[hidden email]> 于2020年6月30日周二 下午1:34写道:
>
> > Hi Zhong Tang,
> >
> >    我查看了该jira有关的重连pr,https://github.com/apache/flink/pull/8429
> > ,但该pr主要通过重连机制来规避网络问题导致的连接失效,
> >  但由于我的业务场景数据比较稀疏,遭遇wait timeout连接失效是一个常见的情况,有最大次数限制的重连机制并不是很适合。
> >
> > 主要的需求其实是connection的持久保活。
> >
> > Thanks,
> > Xuhui Mao
> >
> > Zhonghan Tang <[hidden email]> 于2020年6月30日周二 下午12:05写道:
> >
> > > 可以参考这个jira
> > > https://issues.apache.org/jira/browse/FLINK-12494
> > > 1. Throw execption and let flink runtime handle it;
> > > 2. Handle it in OutputFormat;
> > >
> > >
> > > | |
> > > Zhonghan Tang
> > > |
> > > |
> > > [hidden email]
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > On 06/30/2020 11:53,shizk233<[hidden email]> wrote:
> > > Hi All,
> > > 最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
> > > timeout限制(默认的8小时)导致连接失效。
> > > 即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。
> > >
> > > 版本信息:
> > > flink 1.10.1
> > > mysql server 5.6.47
> > > mysql Connector/J 5.1.49
> > >
> > > 请问:
> > > 1.flink的jdbc
> > connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
> > > 2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
> > > 3.在当前版本下,连接失效有什么比较好的解决方案吗?
> > >
> > > Thanks,
> > > Xuhui Mao
> > >
> > > 异常信息:
> > > 2020-06-24 22:39:46,923 ERROR
> > > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat      - JDBC
> > > executeBatch error, retry times = 1
> > > java.sql.SQLException: Could not retrieve transaction read-only status
> > from
> > > server
> > > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> > > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> > > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> > > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> > > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
> > > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
> > > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
> > > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
> > > at
> > >
> > >
> >
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
> > > at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
> > > at
> > > org.apache.flink.api.java.io
> > > .jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
> > > at
> > > org.apache.flink.api.java.io
> > > .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
> > > at
> > > org.apache.flink.api.java.io
> > >
> >
> .jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
> > > at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> > > at
> > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > > at
> > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > > at java.lang.Thread.run(Thread.java:745)
> > > Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
> > > Communications link failure
> > >
> > > The last packet successfully received from the server was 10,384,059
> > > milliseconds ago.  The last packet sent successfully to the server was
> > > 10,384,063 milliseconds ago.
> > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> > > at
> > >
> > >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > > at
> > >
> > >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > > at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
> > > at
> > com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
> > > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
> > > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
> > > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
> > > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
> > > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
> > > at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
> > > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
> > > ... 13 more
> > > Caused by: java.net.SocketException: Connection reset
> > > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
> > > at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> > > at
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> > > at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> > > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
> > > ... 19 more
> > >
> >
>