当我用flink-connector-elasticsearch6_2.11的1.9.0版本,将数据写入elaticsearch的时候,遇到了
ConcurrentModificationException . 我的错误是这么处理的,每当失败的时候,就加入到失败列表里,等着重试 //每当bluk异步响应回来的时候,都会检查这次发送出去的有没有成功,未成功的就会调用下面的方法 public class ElasticSearchRequestFailureHandler implements ActionRequestFailureHandler { @Override public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { indexer.add(action); } } 但是,报了下面的异常。 看了下代码,ElasticsearchSinkBase的open的时候,构造了failureRequestIndexer = new BufferingNoOpRequestIndexer();而这个类其实就是个list,用来保存失败的request 全局都是用的这个一个list来保存数据。 而在 ElasticsearchSinkBase这类的invoke方法里调用了 BufferingNoOpRequestIndexer的processBufferedRequests这个方法,这方法会循环这个列表。 void processBufferedRequests(RequestIndexer actualIndexer) { for (ActionRequest request : bufferedRequests) { if (request instanceof IndexRequest) { actualIndexer.add((IndexRequest) request); } else if (request instanceof DeleteRequest) { actualIndexer.add((DeleteRequest) request); } else if (request instanceof UpdateRequest) { actualIndexer.add((UpdateRequest) request); } } bufferedRequests.clear(); } 而在afterBulk的时候又在不断的往这个list里添加元素,这里又在循环,所以不就报了ConcurrentModificationException错误吗? 这不是很明显的一个多线程bug吗? Caused by: java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) at java.util.ArrayList$Itr.next(ArrayList.java:859) at org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamCalcRule$644.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector.collect(CRowWrappingMultiOutputCollector.scala:57) at org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector.collect(CRowWrappingMultiOutputCollector.scala:29) at DataStreamJoinRule$581.join(Unknown Source) at org.apache.flink.table.runtime.join.NonWindowJoin.callJoinFunction(NonWindowJoin.scala:224) at org.apache.flink.table.runtime.join.NonWindowInnerJoin.processElement(NonWindowInnerJoin.scala:83) at org.apache.flink.table.runtime.join.NonWindowJoin.processElement1(NonWindowJoin.scala:115) at org.apache.flink.table.runtime.join.NonWindowJoin.processElement1(NonWindowJoin.scala:46) at org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator.processElement1(LegacyKeyedCoProcessOperator.java:81) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:259) at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) |
Free forum by Nabble | Edit this page |