用Flink 写ES ConcurrentModificationException 遇到以下异常:
2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from state RUNNING to FAILING. java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at java.util.ArrayList$Itr.next(ArrayList.java:851) at org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (1/8) (2c3af5806021a731a36ee7f215237aba) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (2/8) (f483a8fc1d22381345545b1e2a6af71b) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (7/8) (db6ca246830a34c359d5ce732cc9ea79) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (8/8) (097ceb890ce879e3c804772cbe0892dd) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d) switched from CANCELING to CANCELED. 帮忙看下,是不是Flink 内部的问题。 |
这不是flink问题吧。你代码遍历List时进行了remove操作,导致这个问题。解决方案是iterator遍历,并iterator.remove即可
在 2019/9/10 下午4:18,“王佩”<[hidden email]> 写入: 用Flink 写ES ConcurrentModificationException 遇到以下异常: 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from state RUNNING to FAILING. java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at java.util.ArrayList$Itr.next(ArrayList.java:851) at org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (1/8) (2c3af5806021a731a36ee7f215237aba) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (2/8) (f483a8fc1d22381345545b1e2a6af71b) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (7/8) (db6ca246830a34c359d5ce732cc9ea79) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (8/8) (097ceb890ce879e3c804772cbe0892dd) switched from RUNNING to CANCELING. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed) switched from CANCELING to CANCELED. 2019-09-10 08:13:14 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d) switched from CANCELING to CANCELED. 帮忙看下,是不是Flink 内部的问题。 |
In reply to this post by 王佩-2
这个应该是 并发写es 同一条记录,出现的并发写问题.
在 2019-09-10 15:18:07,"王佩" <[hidden email]> 写道: >用Flink 写ES ConcurrentModificationException 遇到以下异常: > >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from >state RUNNING to FAILING. >java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) > at java.util.ArrayList$Itr.next(ArrayList.java:851) > at org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64) > at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387) > at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307) > at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (1/8) (2c3af5806021a731a36ee7f215237aba) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (2/8) (f483a8fc1d22381345545b1e2a6af71b) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (7/8) (db6ca246830a34c359d5ce732cc9ea79) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (8/8) (097ceb890ce879e3c804772cbe0892dd) >switched from RUNNING to CANCELING. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416) >switched from CANCELING to CANCELED. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2) >switched from CANCELING to CANCELED. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948) >switched from CANCELING to CANCELED. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860) >switched from CANCELING to CANCELED. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff) >switched from CANCELING to CANCELED. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a) >switched from CANCELING to CANCELED. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973) >switched from CANCELING to CANCELED. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926) >switched from CANCELING to CANCELED. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed) >switched from CANCELING to CANCELED. >2019-09-10 08:13:14 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState >1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> >Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d) >switched from CANCELING to CANCELED. > >帮忙看下,是不是Flink 内部的问题。 |
In reply to this post by wang jinhai
不是代码的问题,代码里边没有遍历List时进行了remove。看报错是从org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer类里报出来的。
BufferingNoOpRequestIndexer 类不是线程安全的。 wang jinhai <[hidden email]> 于2019年9月10日周二 下午4:36写道: > 这不是flink问题吧。你代码遍历List时进行了remove操作,导致这个问题。解决方案是iterator遍历,并iterator.remove即可 > > 在 2019/9/10 下午4:18,“王佩”<[hidden email]> 写入: > > 用Flink 写ES ConcurrentModificationException 遇到以下异常: > > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from > state RUNNING to FAILING. > java.util.ConcurrentModificationException > at > java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) > at java.util.ArrayList$Itr.next(ArrayList.java:851) > at > org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (1/8) (2c3af5806021a731a36ee7f215237aba) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (2/8) (f483a8fc1d22381345545b1e2a6af71b) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (7/8) (db6ca246830a34c359d5ce732cc9ea79) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (8/8) (097ceb890ce879e3c804772cbe0892dd) > switched from RUNNING to CANCELING. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416) > switched from CANCELING to CANCELED. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2) > switched from CANCELING to CANCELED. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948) > switched from CANCELING to CANCELED. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860) > switched from CANCELING to CANCELED. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff) > switched from CANCELING to CANCELED. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a) > switched from CANCELING to CANCELED. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973) > switched from CANCELING to CANCELED. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926) > switched from CANCELING to CANCELED. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed) > switched from CANCELING to CANCELED. > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull -> > Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d) > switched from CANCELING to CANCELED. > > 帮忙看下,是不是Flink 内部的问题。 > > |
Free forum by Nabble | Edit this page |