versin: 1.8.3graph: source -> map -> sink
Scenes: source subtask failed causes the graph to restart, but the exception displayed on the flink UI is not the cause of the task failure displayed: JM log: 020-06-22 14:29:01.087 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job baseInfoAdapter_20601 (20601159280210484110080369520601) switched from state RUNNING to FAILING. java.lang.Exception: Could not perform checkpoint 87 for operator Sink: adapterOutput (19/30). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:597) at org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:270) at org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:186) at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:105) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:769) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not complete snapshot 87 for operator Sink: adapterOutput (19/30). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1115) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1057) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:731) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:643) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:588) ... 8 common frames omitted Caused by: java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:375) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:363) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395) ... 13 common frames omitted TM log:Running to Cannceling 2020-06-22 15:39:19.816 INFO com.xxx.client.consumer.GroupConsumer - consumer xxx to jmq1230:xxx,READ,xxx,NONE is stopped. 2020-06-22 15:39:19.816 INFO org.apache.flink.runtime.taskmanager.Task - Source: baseInfo (79/90) (4e62a84f251d9c68a54e464cff51171e) switched from RUNNING to CANCELING. Is this a known issue? |
Hi Andrew,
this looks like your Flink cluster has a flaky connection to the Kafka cluster or your Kafka cluster was down. Since the operator failed on the sync part of the snapshot, it resorted to failure to avoid having inconsistent operator state. If you configured restarts, it just restart from your last checkpoint 86 and recompute the data. What would be your expectation? That the checkpoint fails but the job continues without restart? In general, the issue with Kafka is that the transaction used for exactly once, eventually time out. So if too many checkpoints cannot be taken, you'd ultimately have incorrect data. Hence, failing and restarting is cleaner as it guarantees consistent data. On Mon, Jun 22, 2020 at 10:16 AM Andrew <[hidden email]> wrote: > versin: 1.8.3 > graph: source -> map -> sink > > Scenes: > source subtask failed causes the graph to restart, but the exception > displayed on the flink UI is not the cause of the task failure > > displayed: > JM log: > 020-06-22 14:29:01.087 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > baseInfoAdapter_20601 (20601159280210484110080369520601) switched from > state RUNNING to FAILING. > java.lang.Exception: Could not perform checkpoint 87 for operator Sink: > adapterOutput (19/30). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:597) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:270) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:186) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:105) > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:209) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:769) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not complete snapshot 87 for > operator Sink: adapterOutput (19/30). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1057) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:731) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:643) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:588) > ... 8 common frames omitted > Caused by: java.lang.Exception: Failed to send data to Kafka: The server > disconnected before a response was received. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:375) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:363) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395) > ... 13 common frames omitted > > > TM log:Running to Cannceling > 2020-06-22 15:39:19.816 INFO com.xxx.client.consumer.GroupConsumer - > consumer xxx to jmq1230:xxx,READ,xxx,NONE is stopped. > 2020-06-22 15:39:19.816 INFO org.apache.flink.runtime.taskmanager.Task - > Source: baseInfo (79/90) (4e62a84f251d9c68a54e464cff51171e) switched from > RUNNING to CANCELING. > > > Is this a known issue? > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |