flink数据sink到mysql 是事务处理

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink数据sink到mysql 是事务处理

1101300123


目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 多条记录的操作
sink的invoke代码
@Override
public void invoke(Tuple5<String, String, String, String, List<BroadBandReq>> value, Context context) throws Exception {
connection.setAutoCommit(false);
List<BroadBandReq> f4 = value.f4;
for (BroadBandReq rs: f4){
statement.setString(1,rs.getUserId());
statement.setString(2,rs.getPhoneNum());
statement.setString(3,rs.getProvId());
statement.addBatch();
}
try {
statement.executeBatch();
connection.commit();
}catch (Exception e){
LOG.info(" add data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4);
connection.rollback();
e.printStackTrace();
            throw new Exception(e);
}
    }




java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when trying to get lock; try restarting transaction
        at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
        at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
        at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
        at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
        at com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
        at com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
        at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
        at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.BatchUpdateException: Deadlock found when trying to get lock; try restarting transaction
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
        at com.mysql.jdbc.Util.getInstance(Util.java:408)
        at com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163)
        at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772)
        at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262)
        at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970)
        at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67)
        ... 33 more
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
        at com.mysql.jdbc.Util.getInstance(Util.java:408)
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
        at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
        at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079)
        at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756)
        ... 36 more
[flink-akka.actor.default-dispatcher-783]











Reply | Threaded
Open this post in threaded view
|

Re: flink数据sink到mysql 是事务处理

Px New
Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
也可发code到我的email [hidden email]😀

1101300123 <[hidden email]> 于2020年4月10日周五 上午11:42写道:

>
>
> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> 多条记录的操作
> sink的invoke代码
> @Override
> public void invoke(Tuple5<String, String, String, String,
> List<BroadBandReq>> value, Context context) throws Exception {
> connection.setAutoCommit(false);
> List<BroadBandReq> f4 = value.f4;
> for (BroadBandReq rs: f4){
> statement.setString(1,rs.getUserId());
> statement.setString(2,rs.getPhoneNum());
> statement.setString(3,rs.getProvId());
> statement.addBatch();
> }
> try {
> statement.executeBatch();
> connection.commit();
> }catch (Exception e){
> LOG.info(" add data for rds ; operTag:{},
> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1,
> value.f2, value.f3,f4);
> connection.rollback();
> e.printStackTrace();
>             throw new Exception(e);
> }
>     }
>
>
>
>
> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when
> trying to get lock; try restarting transaction
>         at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
>         at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
>         at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
>         at
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
>         at
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
>         at
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
>         at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>         at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>         at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>         at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.sql.BatchUpdateException: Deadlock found when trying to
> get lock; try restarting transaction
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>         at com.mysql.jdbc.Util.getInstance(Util.java:408)
>         at
> com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163)
>         at
> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772)
>         at
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262)
>         at
> com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970)
>         at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67)
>         ... 33 more
> Caused by:
> com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock
> found when trying to get lock; try restarting transaction
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>         at com.mysql.jdbc.Util.getInstance(Util.java:408)
>         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952)
>         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
>         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
>         at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
>         at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
>         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
>         at
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
>         at
> com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079)
>         at
> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756)
>         ... 36 more
> [flink-akka.actor.default-dispatcher-783]
>
>
>
>
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

回复: flink数据sink到mysql 是事务处理

Yichao Yang
Hi
&nbsp; &nbsp; &nbsp; 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Px New"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月3日(星期三) 中午11:35
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: flink数据sink到mysql 是事务处理



Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
也可发code到我的email [hidden email]😀

1101300123 <[hidden email]&gt; 于2020年4月10日周五 上午11:42写道:

&gt;
&gt;
&gt; 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
&gt; 多条记录的操作
&gt; sink的invoke代码
&gt; @Override
&gt; public void invoke(Tuple5<String, String, String, String,
&gt; List<BroadBandReq&gt;&gt; value, Context context) throws Exception {
&gt; connection.setAutoCommit(false);
&gt; List<BroadBandReq&gt; f4 = value.f4;
&gt; for (BroadBandReq rs: f4){
&gt; statement.setString(1,rs.getUserId());
&gt; statement.setString(2,rs.getPhoneNum());
&gt; statement.setString(3,rs.getProvId());
&gt; statement.addBatch();
&gt; }
&gt; try {
&gt; statement.executeBatch();
&gt; connection.commit();
&gt; }catch (Exception e){
&gt; LOG.info(" add data for rds ; operTag:{},
&gt; userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1,
&gt; value.f2, value.f3,f4);
&gt; connection.rollback();
&gt; e.printStackTrace();
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new Exception(e);
&gt; }
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;
&gt;
&gt;
&gt;
&gt; java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when
&gt; trying to get lock; try restarting transaction
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.io
&gt; .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.io
&gt; .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.io
&gt; .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; Caused by: java.sql.BatchUpdateException: Deadlock found when trying to
&gt; get lock; try restarting transaction
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
&gt; Method)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.Util.getInstance(Util.java:408)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 33 more
&gt; Caused by:
&gt; com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock
&gt; found when trying to get lock; try restarting transaction
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
&gt; Method)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.Util.getInstance(Util.java:408)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 36 more
&gt; [flink-akka.actor.default-dispatcher-783]
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
Reply | Threaded
Open this post in threaded view
|

Re:回复: flink数据sink到mysql 是事务处理

Michael Ran
我们也会用幂等处理类似的东西。
1.你要么单条数据处理
2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的

















在 2020-06-03 15:41:44,"1048262223" <[hidden email]> 写道:

>Hi
>&nbsp; &nbsp; &nbsp; 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。
>
>
>Best,
>Yichao Yang
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:&nbsp;"Px New"<[hidden email]&gt;;
>发送时间:&nbsp;2020年6月3日(星期三) 中午11:35
>收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
>主题:&nbsp;Re: flink数据sink到mysql 是事务处理
>
>
>
>Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
>也可发code到我的email [hidden email]
>
>1101300123 <[hidden email]&gt; 于2020年4月10日周五 上午11:42写道:
>
>&gt;
>&gt;
>&gt; 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
>&gt; 多条记录的操作
>&gt; sink的invoke代码
>&gt; @Override
>&gt; public void invoke(Tuple5<String, String, String, String,
>&gt; List<BroadBandReq&gt;&gt; value, Context context) throws Exception {
>&gt; connection.setAutoCommit(false);
>&gt; List<BroadBandReq&gt; f4 = value.f4;
>&gt; for (BroadBandReq rs: f4){
>&gt; statement.setString(1,rs.getUserId());
>&gt; statement.setString(2,rs.getPhoneNum());
>&gt; statement.setString(3,rs.getProvId());
>&gt; statement.addBatch();
>&gt; }
>&gt; try {
>&gt; statement.executeBatch();
>&gt; connection.commit();
>&gt; }catch (Exception e){
>&gt; LOG.info(" add data for rds ; operTag:{},
>&gt; userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1,
>&gt; value.f2, value.f3,f4);
>&gt; connection.rollback();
>&gt; e.printStackTrace();
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new Exception(e);
>&gt; }
>&gt;&nbsp;&nbsp;&nbsp;&nbsp; }
>&gt;
>&gt;
>&gt;
>&gt;
>&gt; java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when
>&gt; trying to get lock; try restarting transaction
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.io
>&gt; .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.io
>&gt; .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.io
>&gt; .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.lang.Thread.run(Thread.java:748)
>&gt; Caused by: java.sql.BatchUpdateException: Deadlock found when trying to
>&gt; get lock; try restarting transaction
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>&gt; Method)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.Util.getInstance(Util.java:408)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 33 more
>&gt; Caused by:
>&gt; com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock
>&gt; found when trying to get lock; try restarting transaction
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>&gt; Method)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.Util.getInstance(Util.java:408)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
>&gt; com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756)
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 36 more
>&gt; [flink-akka.actor.default-dispatcher-783]
>&gt;
>&gt;
>&gt;
>&gt;
>&gt;
>&gt;
>&gt;
>&gt;
>&gt;
>&gt;
>&gt;
>&gt;
Reply | Threaded
Open this post in threaded view
|

Re: 回复: flink数据sink到mysql 是事务处理

godfrey he
hi [hidden email][hidden email]
能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等

hi [hidden email],
你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink
sql,还是datastream?

Bests,
Godfrey

Bests,
Godfrey

Michael Ran <[hidden email]> 于2020年6月3日周三 下午8:07写道:

> 我们也会用幂等处理类似的东西。
> 1.你要么单条数据处理
> 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-03 15:41:44,"1048262223" <[hidden email]> 写道:
> >Hi
> >&nbsp; &nbsp; &nbsp;
> 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。
> >
> >
> >Best,
> >Yichao Yang
> >
> >
> >
> >
> >------------------&nbsp;原始邮件&nbsp;------------------
> >发件人:&nbsp;"Px New"<[hidden email]&gt;;
> >发送时间:&nbsp;2020年6月3日(星期三) 中午11:35
> >收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> >
> >主题:&nbsp;Re: flink数据sink到mysql 是事务处理
> >
> >
> >
> >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
> >也可发code到我的email [hidden email]
> >
> >1101300123 <[hidden email]&gt; 于2020年4月10日周五 上午11:42写道:
> >
> >&gt;
> >&gt;
> >&gt;
> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> >&gt; 多条记录的操作
> >&gt; sink的invoke代码
> >&gt; @Override
> >&gt; public void invoke(Tuple5<String, String, String, String,
> >&gt; List<BroadBandReq&gt;&gt; value, Context context) throws Exception {
> >&gt; connection.setAutoCommit(false);
> >&gt; List<BroadBandReq&gt; f4 = value.f4;
> >&gt; for (BroadBandReq rs: f4){
> >&gt; statement.setString(1,rs.getUserId());
> >&gt; statement.setString(2,rs.getPhoneNum());
> >&gt; statement.setString(3,rs.getProvId());
> >&gt; statement.addBatch();
> >&gt; }
> >&gt; try {
> >&gt; statement.executeBatch();
> >&gt; connection.commit();
> >&gt; }catch (Exception e){
> >&gt; LOG.info(" add data for rds ; operTag:{},
> >&gt; userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0,
> value.f1,
> >&gt; value.f2, value.f3,f4);
> >&gt; connection.rollback();
> >&gt; e.printStackTrace();
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> throw new Exception(e);
> >&gt; }
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt; java.lang.Exception: java.sql.BatchUpdateException: Deadlock found
> when
> >&gt; trying to get lock; try restarting transaction
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.io
> >&gt;
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.io
> >&gt; .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.io
> >&gt;
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.lang.Thread.run(Thread.java:748)
> >&gt; Caused by: java.sql.BatchUpdateException: Deadlock found when trying
> to
> >&gt; get lock; try restarting transaction
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >&gt; Method)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.Util.getInstance(Util.java:408)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt; com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 33 more
> >&gt; Caused by:
> >&gt; com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException:
> Deadlock
> >&gt; found when trying to get lock; try restarting transaction
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >&gt; Method)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.Util.getInstance(Util.java:408)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 36 more
> >&gt; [flink-akka.actor.default-dispatcher-783]
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink数据sink到mysql 是事务处理

1101300123
您好:
    我是在样的情况,flink处理完数据后数据被我组织成一个3元组,第一个元素代表的是对这个元素的操作(插入还是删除),第二个是我的user_ID,第三个是一个list里面有多个实体;
    数据要插入或者删除,因为list的大小不确定所以在操作mysql的时候开起了事务保证这一批次的数据幂等,但是程序运行一段时间后出现死锁,但是不影响数据,所以发邮件想知道大家
    是怎么处理的,结果这么多天才有回复讨论;
    伪代码如下:
   
public RdsOperaterSink2(String url, String name, String password) {
        this.url = url;
        this.name = name;
        this.password = password;
    }

    private transient volatile PreparedStatement statement;
    private transient volatile Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.jdbc.Driver");
        connection = DriverManager.getConnection(url,name,password);
        connection.setAutoCommit(false);
        System.out.println("connection ---" + connection);
    }

    @Override
    public void invoke(Tuple5<String, String, String, String, List<BroadBandReq>> value, Context context) throws Exception {
        List<BroadBandReq> f4 = value.f4;
        String operType = value.f0;
        if(operType.equals(BusiConst.D_OPER_TYPE)){
            statement = connection.prepareStatement(BusiConst.DELETESQL);
//            LOG.info(" delete data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4);
            for (BroadBandReq rs: f4){
                statement.setString(1,rs.getPhoneNum());
                statement.setString(2,rs.getProvId());
                statement.addBatch();
            }
        }else{
            statement = connection.prepareStatement(BusiConst.ADDSQL);
//            LOG.info(" add data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4);
            for (BroadBandReq rs: f4){
                statement.setString(1,rs.getUserId());
                statement.setString(2,rs.getPhoneNum());
                statement.setString(3,rs.getProvId());
                statement.addBatch();
            }
        }
        try {
            statement.executeBatch();
            connection.commit();
        }catch (Exception e){
            LOG.info("  data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4);
            System.out.println("  data for rds :  " +  value);
            System.out.println("invoke  ---" + connection);
            connection.rollback();
            LOG.error(e.getMessage());
            e.printStackTrace();
            throw new Exception(e);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        statement.close();
        connection.close();
    }



[hidden email]
 
发件人: godfrey he
发送时间: 2020-06-03 21:59
收件人: user-zh; greemqqran; 15701181132mr.liu; hdxg1101300123
主题: Re: 回复: flink数据sink到mysql 是事务处理
hi [hidden email][hidden email]
能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等
 
hi [hidden email],
你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink
sql,还是datastream?
 
Bests,
Godfrey
 
Bests,
Godfrey
 
Michael Ran <[hidden email]> 于2020年6月3日周三 下午8:07写道:
 

> 我们也会用幂等处理类似的东西。
> 1.你要么单条数据处理
> 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-03 15:41:44,"1048262223" <[hidden email]> 写道:
> >Hi
> >&nbsp; &nbsp; &nbsp;
> 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。
> >
> >
> >Best,
> >Yichao Yang
> >
> >
> >
> >
> >------------------&nbsp;原始邮件&nbsp;------------------
> >发件人:&nbsp;"Px New"<[hidden email]&gt;;
> >发送时间:&nbsp;2020年6月3日(星期三) 中午11:35
> >收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> >
> >主题:&nbsp;Re: flink数据sink到mysql 是事务处理
> >
> >
> >
> >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
> >也可发code到我的email [hidden email]
> >
> >1101300123 <[hidden email]&gt; 于2020年4月10日周五 上午11:42写道:
> >
> >&gt;
> >&gt;
> >&gt;
> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> >&gt; 多条记录的操作
> >&gt; sink的invoke代码
> >&gt; @Override
> >&gt; public void invoke(Tuple5<String, String, String, String,
> >&gt; List<BroadBandReq&gt;&gt; value, Context context) throws Exception {
> >&gt; connection.setAutoCommit(false);
> >&gt; List<BroadBandReq&gt; f4 = value.f4;
> >&gt; for (BroadBandReq rs: f4){
> >&gt; statement.setString(1,rs.getUserId());
> >&gt; statement.setString(2,rs.getPhoneNum());
> >&gt; statement.setString(3,rs.getProvId());
> >&gt; statement.addBatch();
> >&gt; }
> >&gt; try {
> >&gt; statement.executeBatch();
> >&gt; connection.commit();
> >&gt; }catch (Exception e){
> >&gt; LOG.info(" add data for rds ; operTag:{},
> >&gt; userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0,
> value.f1,
> >&gt; value.f2, value.f3,f4);
> >&gt; connection.rollback();
> >&gt; e.printStackTrace();
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> throw new Exception(e);
> >&gt; }
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt; java.lang.Exception: java.sql.BatchUpdateException: Deadlock found
> when
> >&gt; trying to get lock; try restarting transaction
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.io
> >&gt;
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.io
> >&gt; .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.io
> >&gt;
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.lang.Thread.run(Thread.java:748)
> >&gt; Caused by: java.sql.BatchUpdateException: Deadlock found when trying
> to
> >&gt; get lock; try restarting transaction
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >&gt; Method)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.Util.getInstance(Util.java:408)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt; com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 33 more
> >&gt; Caused by:
> >&gt; com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException:
> Deadlock
> >&gt; found when trying to get lock; try restarting transaction
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >&gt; Method)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.Util.getInstance(Util.java:408)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >&gt;
> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756)
> >&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 36 more
> >&gt; [flink-akka.actor.default-dispatcher-783]
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
> >&gt;
>