Hi
The root cause is checkpoint error due to fail to send data to kafka during 'preCommit'. The right solution is avoid to send data to kafka unsuccessfully which might be scope of Kafka. If you cannot ensure the status of kafka with its client and no request for exactly once, you can pass FlinkKafkaProducer.Semantic.NONE to disable sending data during 'preCommit' when creating the kafka producer. If you don't want job failed due to checkpoint error, you can increase the tolerableDeclinedCheckpointNumber: env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber); Best Yun Tang ________________________________ From: jose farfan <[hidden email]> Sent: Wednesday, January 15, 2020 23:21 To: ouywl <[hidden email]> Cc: user <[hidden email]>; [hidden email] <[hidden email]> Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error . Hi I have the same issue. BR Jose On Thu, 9 Jan 2020 at 10:28, ouywl <[hidden email]<mailto:[hidden email]>> wrote: Hi all: When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as log-1,code is:: input.addSink( new FlinkKafkaProducer<KafkaEvent>( parameterTool.getRequired("bootstrap.servers"), parameterTool.getRequired("output-topic"), new KafkaEventDeSchema())); Log-1: 2020-01-09 09:13:44,476 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400. 2020-01-09 09:15:33,069 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1 by task f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361). 2020-01-09 09:15:33,070 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creation at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) ... 17 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creation 2020-01-09 09:15:33,074 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job producer data frequece (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1443) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1353) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:722) at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Best, Ouywl |
Free forum by Nabble | Edit this page |