目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 多条记录的操作 sink的invoke代码 @Override public void invoke(Tuple5<String, String, String, String, List<BroadBandReq>> value, Context context) throws Exception { connection.setAutoCommit(false); List<BroadBandReq> f4 = value.f4; for (BroadBandReq rs: f4){ statement.setString(1,rs.getUserId()); statement.setString(2,rs.getPhoneNum()); statement.setString(3,rs.getProvId()); statement.addBatch(); } try { statement.executeBatch(); connection.commit(); }catch (Exception e){ LOG.info(" add data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4); connection.rollback(); e.printStackTrace(); throw new Exception(e); } } java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when trying to get lock; try restarting transaction at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) at com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) at com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.sql.BatchUpdateException: Deadlock found when trying to get lock; try restarting transaction at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) at com.mysql.jdbc.Util.getInstance(Util.java:408) at com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) ... 33 more Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) at com.mysql.jdbc.Util.getInstance(Util.java:408) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) ... 36 more [flink-akka.actor.default-dispatcher-783] |
Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
也可发code到我的email [hidden email]😀 1101300123 <[hidden email]> 于2020年4月10日周五 上午11:42写道: > > > 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 > 多条记录的操作 > sink的invoke代码 > @Override > public void invoke(Tuple5<String, String, String, String, > List<BroadBandReq>> value, Context context) throws Exception { > connection.setAutoCommit(false); > List<BroadBandReq> f4 = value.f4; > for (BroadBandReq rs: f4){ > statement.setString(1,rs.getUserId()); > statement.setString(2,rs.getPhoneNum()); > statement.setString(3,rs.getProvId()); > statement.addBatch(); > } > try { > statement.executeBatch(); > connection.commit(); > }catch (Exception e){ > LOG.info(" add data for rds ; operTag:{}, > userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, > value.f2, value.f3,f4); > connection.rollback(); > e.printStackTrace(); > throw new Exception(e); > } > } > > > > > java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when > trying to get lock; try restarting transaction > at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) > at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) > at > org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) > at > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) > at > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) > at org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > at org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.sql.BatchUpdateException: Deadlock found when trying to > get lock; try restarting transaction > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > at com.mysql.jdbc.Util.getInstance(Util.java:408) > at > com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) > at > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) > at > com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) > at > com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) > at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) > ... 33 more > Caused by: > com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock > found when trying to get lock; try restarting transaction > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > at com.mysql.jdbc.Util.getInstance(Util.java:408) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) > at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) > at > com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) > at > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) > ... 36 more > [flink-akka.actor.default-dispatcher-783] > > > > > > > > > > > > |
Hi
可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "Px New"<[hidden email]>; 发送时间: 2020年6月3日(星期三) 中午11:35 收件人: "user-zh"<[hidden email]>; 主题: Re: flink数据sink到mysql 是事务处理 Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你 也可发code到我的email [hidden email]😀 1101300123 <[hidden email]> 于2020年4月10日周五 上午11:42写道: > > > 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 > 多条记录的操作 > sink的invoke代码 > @Override > public void invoke(Tuple5<String, String, String, String, > List<BroadBandReq>> value, Context context) throws Exception { > connection.setAutoCommit(false); > List<BroadBandReq> f4 = value.f4; > for (BroadBandReq rs: f4){ > statement.setString(1,rs.getUserId()); > statement.setString(2,rs.getPhoneNum()); > statement.setString(3,rs.getProvId()); > statement.addBatch(); > } > try { > statement.executeBatch(); > connection.commit(); > }catch (Exception e){ > LOG.info(" add data for rds ; operTag:{}, > userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, > value.f2, value.f3,f4); > connection.rollback(); > e.printStackTrace(); > throw new Exception(e); > } > } > > > > > java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when > trying to get lock; try restarting transaction > at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) > at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) > at > org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) > at > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) > at > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) > at org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > at org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.sql.BatchUpdateException: Deadlock found when trying to > get lock; try restarting transaction > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > at com.mysql.jdbc.Util.getInstance(Util.java:408) > at > com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) > at > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) > at > com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) > at > com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) > at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) > ... 33 more > Caused by: > com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock > found when trying to get lock; try restarting transaction > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > at com.mysql.jdbc.Util.getInstance(Util.java:408) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) > at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) > at > com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) > at > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) > ... 36 more > [flink-akka.actor.default-dispatcher-783] > > > > > > > > > > > > |
我们也会用幂等处理类似的东西。
1.你要么单条数据处理 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的 在 2020-06-03 15:41:44,"1048262223" <[hidden email]> 写道: >Hi > 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。 > > >Best, >Yichao Yang > > > > >------------------ 原始邮件 ------------------ >发件人: "Px New"<[hidden email]>; >发送时间: 2020年6月3日(星期三) 中午11:35 >收件人: "user-zh"<[hidden email]>; > >主题: Re: flink数据sink到mysql 是事务处理 > > > >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你 >也可发code到我的email [hidden email] > >1101300123 <[hidden email]> 于2020年4月10日周五 上午11:42写道: > >> >> >> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 >> 多条记录的操作 >> sink的invoke代码 >> @Override >> public void invoke(Tuple5<String, String, String, String, >> List<BroadBandReq>> value, Context context) throws Exception { >> connection.setAutoCommit(false); >> List<BroadBandReq> f4 = value.f4; >> for (BroadBandReq rs: f4){ >> statement.setString(1,rs.getUserId()); >> statement.setString(2,rs.getPhoneNum()); >> statement.setString(3,rs.getProvId()); >> statement.addBatch(); >> } >> try { >> statement.executeBatch(); >> connection.commit(); >> }catch (Exception e){ >> LOG.info(" add data for rds ; operTag:{}, >> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, >> value.f2, value.f3,f4); >> connection.rollback(); >> e.printStackTrace(); >> throw new Exception(e); >> } >> } >> >> >> >> >> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when >> trying to get lock; try restarting transaction >> at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) >> at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) >> at >> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) >> at >> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) >> at >> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) >> at >> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) >> at org.apache.flink.streaming.runtime.io >> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) >> at org.apache.flink.streaming.runtime.io >> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) >> at org.apache.flink.streaming.runtime.io >> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.sql.BatchUpdateException: Deadlock found when trying to >> get lock; try restarting transaction >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) >> at com.mysql.jdbc.Util.getInstance(Util.java:408) >> at >> com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) >> at >> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) >> at >> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) >> at >> com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) >> at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) >> ... 33 more >> Caused by: >> com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock >> found when trying to get lock; try restarting transaction >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) >> at com.mysql.jdbc.Util.getInstance(Util.java:408) >> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) >> at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) >> at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) >> at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) >> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) >> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) >> at >> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) >> at >> com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) >> at >> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) >> ... 36 more >> [flink-akka.actor.default-dispatcher-783] >> >> >> >> >> >> >> >> >> >> >> >> |
hi [hidden email],[hidden email]
能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等 hi [hidden email], 你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink sql,还是datastream? Bests, Godfrey Bests, Godfrey Michael Ran <[hidden email]> 于2020年6月3日周三 下午8:07写道: > 我们也会用幂等处理类似的东西。 > 1.你要么单条数据处理 > 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的 > > > > > > > > > > > > > > > > > > 在 2020-06-03 15:41:44,"1048262223" <[hidden email]> 写道: > >Hi > > > 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。 > > > > > >Best, > >Yichao Yang > > > > > > > > > >------------------ 原始邮件 ------------------ > >发件人: "Px New"<[hidden email]>; > >发送时间: 2020年6月3日(星期三) 中午11:35 > >收件人: "user-zh"<[hidden email]>; > > > >主题: Re: flink数据sink到mysql 是事务处理 > > > > > > > >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你 > >也可发code到我的email [hidden email] > > > >1101300123 <[hidden email]> 于2020年4月10日周五 上午11:42写道: > > > >> > >> > >> > 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 > >> 多条记录的操作 > >> sink的invoke代码 > >> @Override > >> public void invoke(Tuple5<String, String, String, String, > >> List<BroadBandReq>> value, Context context) throws Exception { > >> connection.setAutoCommit(false); > >> List<BroadBandReq> f4 = value.f4; > >> for (BroadBandReq rs: f4){ > >> statement.setString(1,rs.getUserId()); > >> statement.setString(2,rs.getPhoneNum()); > >> statement.setString(3,rs.getProvId()); > >> statement.addBatch(); > >> } > >> try { > >> statement.executeBatch(); > >> connection.commit(); > >> }catch (Exception e){ > >> LOG.info(" add data for rds ; operTag:{}, > >> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, > value.f1, > >> value.f2, value.f3,f4); > >> connection.rollback(); > >> e.printStackTrace(); > >> > throw new Exception(e); > >> } > >> } > >> > >> > >> > >> > >> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found > when > >> trying to get lock; try restarting transaction > >> at > com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) > >> at > com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) > >> at > >> > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > >> at > >> > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) > >> at > >> > org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) > >> at > >> > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) > >> at > >> > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) > >> at > >> > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > >> at > >> > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) > >> at > org.apache.flink.streaming.runtime.io > >> > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > >> at > org.apache.flink.streaming.runtime.io > >> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > >> at > org.apache.flink.streaming.runtime.io > >> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > >> at > >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > >> at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > >> at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > >> at > java.lang.Thread.run(Thread.java:748) > >> Caused by: java.sql.BatchUpdateException: Deadlock found when trying > to > >> get lock; try restarting transaction > >> at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > >> at > com.mysql.jdbc.Util.getInstance(Util.java:408) > >> at > >> > com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) > >> at > >> com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) > >> at > com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) > >> ... 33 more > >> Caused by: > >> com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: > Deadlock > >> found when trying to get lock; try restarting transaction > >> at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > >> at > com.mysql.jdbc.Util.getInstance(Util.java:408) > >> at > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) > >> at > com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) > >> at > com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) > >> at > com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) > >> at > com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) > >> at > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) > >> ... 36 more > >> [flink-akka.actor.default-dispatcher-783] > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > |
您好:
我是在样的情况,flink处理完数据后数据被我组织成一个3元组,第一个元素代表的是对这个元素的操作(插入还是删除),第二个是我的user_ID,第三个是一个list里面有多个实体; 数据要插入或者删除,因为list的大小不确定所以在操作mysql的时候开起了事务保证这一批次的数据幂等,但是程序运行一段时间后出现死锁,但是不影响数据,所以发邮件想知道大家 是怎么处理的,结果这么多天才有回复讨论; 伪代码如下: public RdsOperaterSink2(String url, String name, String password) { this.url = url; this.name = name; this.password = password; } private transient volatile PreparedStatement statement; private transient volatile Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection(url,name,password); connection.setAutoCommit(false); System.out.println("connection ---" + connection); } @Override public void invoke(Tuple5<String, String, String, String, List<BroadBandReq>> value, Context context) throws Exception { List<BroadBandReq> f4 = value.f4; String operType = value.f0; if(operType.equals(BusiConst.D_OPER_TYPE)){ statement = connection.prepareStatement(BusiConst.DELETESQL); // LOG.info(" delete data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4); for (BroadBandReq rs: f4){ statement.setString(1,rs.getPhoneNum()); statement.setString(2,rs.getProvId()); statement.addBatch(); } }else{ statement = connection.prepareStatement(BusiConst.ADDSQL); // LOG.info(" add data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4); for (BroadBandReq rs: f4){ statement.setString(1,rs.getUserId()); statement.setString(2,rs.getPhoneNum()); statement.setString(3,rs.getProvId()); statement.addBatch(); } } try { statement.executeBatch(); connection.commit(); }catch (Exception e){ LOG.info(" data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4); System.out.println(" data for rds : " + value); System.out.println("invoke ---" + connection); connection.rollback(); LOG.error(e.getMessage()); e.printStackTrace(); throw new Exception(e); } } @Override public void close() throws Exception { super.close(); statement.close(); connection.close(); } [hidden email] 发件人: godfrey he 发送时间: 2020-06-03 21:59 收件人: user-zh; greemqqran; 15701181132mr.liu; hdxg1101300123 主题: Re: 回复: flink数据sink到mysql 是事务处理 hi [hidden email],[hidden email] 能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等 hi [hidden email], 你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink sql,还是datastream? Bests, Godfrey Bests, Godfrey Michael Ran <[hidden email]> 于2020年6月3日周三 下午8:07写道: > 我们也会用幂等处理类似的东西。 > 1.你要么单条数据处理 > 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的 > > > > > > > > > > > > > > > > > > 在 2020-06-03 15:41:44,"1048262223" <[hidden email]> 写道: > >Hi > > > 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。 > > > > > >Best, > >Yichao Yang > > > > > > > > > >------------------ 原始邮件 ------------------ > >发件人: "Px New"<[hidden email]>; > >发送时间: 2020年6月3日(星期三) 中午11:35 > >收件人: "user-zh"<[hidden email]>; > > > >主题: Re: flink数据sink到mysql 是事务处理 > > > > > > > >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你 > >也可发code到我的email [hidden email] > > > >1101300123 <[hidden email]> 于2020年4月10日周五 上午11:42写道: > > > >> > >> > >> > 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 > >> 多条记录的操作 > >> sink的invoke代码 > >> @Override > >> public void invoke(Tuple5<String, String, String, String, > >> List<BroadBandReq>> value, Context context) throws Exception { > >> connection.setAutoCommit(false); > >> List<BroadBandReq> f4 = value.f4; > >> for (BroadBandReq rs: f4){ > >> statement.setString(1,rs.getUserId()); > >> statement.setString(2,rs.getPhoneNum()); > >> statement.setString(3,rs.getProvId()); > >> statement.addBatch(); > >> } > >> try { > >> statement.executeBatch(); > >> connection.commit(); > >> }catch (Exception e){ > >> LOG.info(" add data for rds ; operTag:{}, > >> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, > value.f1, > >> value.f2, value.f3,f4); > >> connection.rollback(); > >> e.printStackTrace(); > >> > throw new Exception(e); > >> } > >> } > >> > >> > >> > >> > >> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found > when > >> trying to get lock; try restarting transaction > >> at > com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) > >> at > com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) > >> at > >> > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > >> at > >> > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) > >> at > >> > org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) > >> at > >> > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) > >> at > >> > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) > >> at > >> > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > >> at > >> > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) > >> at > org.apache.flink.streaming.runtime.io > >> > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > >> at > org.apache.flink.streaming.runtime.io > >> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > >> at > org.apache.flink.streaming.runtime.io > >> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > >> at > >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > >> at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > >> at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > >> at > java.lang.Thread.run(Thread.java:748) > >> Caused by: java.sql.BatchUpdateException: Deadlock found when trying > to > >> get lock; try restarting transaction > >> at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > >> at > com.mysql.jdbc.Util.getInstance(Util.java:408) > >> at > >> > com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) > >> at > >> com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) > >> at > com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) > >> ... 33 more > >> Caused by: > >> com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: > Deadlock > >> found when trying to get lock; try restarting transaction > >> at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > >> at > com.mysql.jdbc.Util.getInstance(Util.java:408) > >> at > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) > >> at > com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) > >> at > com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) > >> at > com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) > >> at > com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) > >> at > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) > >> ... 36 more > >> [flink-akka.actor.default-dispatcher-783] > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > |
Free forum by Nabble | Edit this page |