Caused by: java.util.ConcurrentModificationException 异常

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Caused by: java.util.ConcurrentModificationException 异常

淘宝龙安
当我用flink-connector-elasticsearch6_2.11的1.9.0版本,将数据写入elaticsearch的时候,遇到了
ConcurrentModificationException
.

我的错误是这么处理的,每当失败的时候,就加入到失败列表里,等着重试

//每当bluk异步响应回来的时候,都会检查这次发送出去的有没有成功,未成功的就会调用下面的方法
public class ElasticSearchRequestFailureHandler implements
ActionRequestFailureHandler {

 @Override
    public void onFailure(ActionRequest action, Throwable failure, int
restStatusCode, RequestIndexer indexer) throws Throwable {

            indexer.add(action);

    }
}

但是,报了下面的异常。

看了下代码,ElasticsearchSinkBase的open的时候,构造了failureRequestIndexer = new
BufferingNoOpRequestIndexer();而这个类其实就是个list,用来保存失败的request
全局都是用的这个一个list来保存数据。

而在 ElasticsearchSinkBase这类的invoke方法里调用了
BufferingNoOpRequestIndexer的processBufferedRequests这个方法,这方法会循环这个列表。

void processBufferedRequests(RequestIndexer actualIndexer) {
for (ActionRequest request : bufferedRequests) {
if (request instanceof IndexRequest) {
actualIndexer.add((IndexRequest) request);
} else if (request instanceof DeleteRequest) {
actualIndexer.add((DeleteRequest) request);
} else if (request instanceof UpdateRequest) {
actualIndexer.add((UpdateRequest) request);
}
}
bufferedRequests.clear();
}

而在afterBulk的时候又在不断的往这个list里添加元素,这里又在循环,所以不就报了ConcurrentModificationException错误吗?
这不是很明显的一个多线程bug吗?



Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at
org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamCalcRule$644.processElement(Unknown Source)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector.collect(CRowWrappingMultiOutputCollector.scala:57)
at
org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector.collect(CRowWrappingMultiOutputCollector.scala:29)
at DataStreamJoinRule$581.join(Unknown Source)
at
org.apache.flink.table.runtime.join.NonWindowJoin.callJoinFunction(NonWindowJoin.scala:224)
at
org.apache.flink.table.runtime.join.NonWindowInnerJoin.processElement(NonWindowInnerJoin.scala:83)
at
org.apache.flink.table.runtime.join.NonWindowJoin.processElement1(NonWindowJoin.scala:115)
at
org.apache.flink.table.runtime.join.NonWindowJoin.processElement1(NonWindowJoin.scala:46)
at
org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator.processElement1(LegacyKeyedCoProcessOperator.java:81)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)