Flink 写ES ConcurrentModificationException 异常

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 写ES ConcurrentModificationException 异常

王佩-2
用Flink 写ES ConcurrentModificationException 遇到以下异常:

2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from
state RUNNING to FAILING.
java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
        at java.util.ArrayList$Itr.next(ArrayList.java:851)
        at org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64)
        at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387)
        at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (1/8) (2c3af5806021a731a36ee7f215237aba)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (2/8) (f483a8fc1d22381345545b1e2a6af71b)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (7/8) (db6ca246830a34c359d5ce732cc9ea79)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (8/8) (097ceb890ce879e3c804772cbe0892dd)
switched from RUNNING to CANCELING.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416)
switched from CANCELING to CANCELED.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2)
switched from CANCELING to CANCELED.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948)
switched from CANCELING to CANCELED.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860)
switched from CANCELING to CANCELED.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff)
switched from CANCELING to CANCELED.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a)
switched from CANCELING to CANCELED.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973)
switched from CANCELING to CANCELED.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926)
switched from CANCELING to CANCELED.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed)
switched from CANCELING to CANCELED.
2019-09-10 08:13:14 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d)
switched from CANCELING to CANCELED.

帮忙看下,是不是Flink 内部的问题。
Reply | Threaded
Open this post in threaded view
|

Re: Flink 写ES ConcurrentModificationException 异常

wang jinhai
这不是flink问题吧。你代码遍历List时进行了remove操作,导致这个问题。解决方案是iterator遍历,并iterator.remove即可

在 2019/9/10 下午4:18,“王佩”<[hidden email]> 写入:

    用Flink 写ES ConcurrentModificationException 遇到以下异常:
   
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from
    state RUNNING to FAILING.
    java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
    at java.util.ArrayList$Itr.next(ArrayList.java:851)
    at org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (1/8) (2c3af5806021a731a36ee7f215237aba)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (2/8) (f483a8fc1d22381345545b1e2a6af71b)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (7/8) (db6ca246830a34c359d5ce732cc9ea79)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (8/8) (097ceb890ce879e3c804772cbe0892dd)
    switched from RUNNING to CANCELING.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416)
    switched from CANCELING to CANCELED.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2)
    switched from CANCELING to CANCELED.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948)
    switched from CANCELING to CANCELED.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860)
    switched from CANCELING to CANCELED.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff)
    switched from CANCELING to CANCELED.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a)
    switched from CANCELING to CANCELED.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973)
    switched from CANCELING to CANCELED.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926)
    switched from CANCELING to CANCELED.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed)
    switched from CANCELING to CANCELED.
    2019-09-10 08:13:14 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
    1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
    Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d)
    switched from CANCELING to CANCELED.
   
    帮忙看下,是不是Flink 内部的问题。
   
hb
Reply | Threaded
Open this post in threaded view
|

Re:Flink 写ES ConcurrentModificationException 异常

hb
In reply to this post by 王佩-2
这个应该是 并发写es 同一条记录,出现的并发写问题.








在 2019-09-10 15:18:07,"王佩" <[hidden email]> 写道:

>用Flink 写ES ConcurrentModificationException 遇到以下异常:
>
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from
>state RUNNING to FAILING.
>java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64)
> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387)
> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
> at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (1/8) (2c3af5806021a731a36ee7f215237aba)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (2/8) (f483a8fc1d22381345545b1e2a6af71b)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (7/8) (db6ca246830a34c359d5ce732cc9ea79)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (8/8) (097ceb890ce879e3c804772cbe0892dd)
>switched from RUNNING to CANCELING.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416)
>switched from CANCELING to CANCELED.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2)
>switched from CANCELING to CANCELED.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948)
>switched from CANCELING to CANCELED.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860)
>switched from CANCELING to CANCELED.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff)
>switched from CANCELING to CANCELED.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a)
>switched from CANCELING to CANCELED.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973)
>switched from CANCELING to CANCELED.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926)
>switched from CANCELING to CANCELED.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed)
>switched from CANCELING to CANCELED.
>2019-09-10 08:13:14 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d)
>switched from CANCELING to CANCELED.
>
>帮忙看下,是不是Flink 内部的问题。
Reply | Threaded
Open this post in threaded view
|

Re: Flink 写ES ConcurrentModificationException 异常

王佩-2
In reply to this post by wang jinhai
不是代码的问题,代码里边没有遍历List时进行了remove。看报错是从org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer类里报出来的。
BufferingNoOpRequestIndexer 类不是线程安全的。

wang jinhai <[hidden email]> 于2019年9月10日周二 下午4:36写道:

> 这不是flink问题吧。你代码遍历List时进行了remove操作,导致这个问题。解决方案是iterator遍历,并iterator.remove即可
>
> 在 2019/9/10 下午4:18,“王佩”<[hidden email]> 写入:
>
>     用Flink 写ES ConcurrentModificationException 遇到以下异常:
>
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from
>     state RUNNING to FAILING.
>     java.util.ConcurrentModificationException
>         at
> java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
>         at java.util.ArrayList$Itr.next(ArrayList.java:851)
>         at
> org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64)
>         at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387)
>         at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
>         at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:745)
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (1/8) (2c3af5806021a731a36ee7f215237aba)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (2/8) (f483a8fc1d22381345545b1e2a6af71b)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (7/8) (db6ca246830a34c359d5ce732cc9ea79)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (8/8) (097ceb890ce879e3c804772cbe0892dd)
>     switched from RUNNING to CANCELING.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (6/8) (67d3f14fdca5ef94caad1c7a2d866416)
>     switched from CANCELING to CANCELED.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (4/7) (373e5417ba6514af74c874a9526bc8e2)
>     switched from CANCELING to CANCELED.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (7/7) (1eb82cf791c608a4b3d8f8e15aaf6948)
>     switched from CANCELING to CANCELED.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860)
>     switched from CANCELING to CANCELED.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (5/7) (d506cbef09c1ab7441543d03f43875ff)
>     switched from CANCELING to CANCELED.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (2/7) (1d4b9af5ec8602a1930bd3adaf593c6a)
>     switched from CANCELING to CANCELED.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (6/7) (a84920b7c44801585ba970de08c09973)
>     switched from CANCELING to CANCELED.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 Source: KafkaSource (3/7) (008557eedfe304b378eb9403646d2926)
>     switched from CANCELING to CANCELED.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (3/8) (3fd4f4b9955666adfb58e9243f4f84ed)
>     switched from CANCELING to CANCELED.
>     2019-09-10 08:13:14 INFO
>     org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
>     1417 ToJSON -> FilterNull -> ExtExtractor(Plugin) -> FilterNull ->
>     Sink: ElasticsearchSink (4/8) (5b049f52e2742557603683889c348e9d)
>     switched from CANCELING to CANCELED.
>
>     帮忙看下,是不是Flink 内部的问题。
>
>