回复:flink任务checkpoint无法完成snapshot,且报kafka异常

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

回复:flink任务checkpoint无法完成snapshot,且报kafka异常

Yichao Yang
Hi


看报错是checkpoint失败次数超过了最大限制导致任务失败。checkpoint间隔设置太小了,在我们团队通常都是分钟级别的interval,我们一般设置5分钟,checkpoint只是一个容错机制,没有特殊的需求场景不需要设置间隔那么短,并且频繁checkpoint会导致性能问题。


Best,
Yichao Yang


------------------ 原始邮件 ------------------
发件人: Zhefu PENG <[hidden email]&gt;
发送时间: 2020年6月10日 13:04
收件人: user-zh <[hidden email]&gt;
主题: 回复:flink任务checkpoint无法完成snapshot,且报kafka异常



Hi all,

现在有一个简单的flink任务,大概chain在一起后的执行图为:
Source: Custom Source -&gt; Map -&gt; Source_Map -&gt; Empty_Filer -&gt; Field_Filter
-&gt; Type_Filter -&gt; Value_Filter -&gt; Map -&gt; Map -&gt; Map -&gt; Sink: Unnamed

但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。

部分报错信息如下:
2020-06-10 12:02:49,083 INFO
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;&nbsp; - Triggering
checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
2020-06-10 12:04:47,898 INFO
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;&nbsp; - Decline
checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
c41f4811262db1c4c270b136571c8201 at
container_e27_1591466310139_21670_01_000006 @
hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
2020-06-10 12:04:47,899 INFO
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;&nbsp; - Discarding
checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 1 for operator Source: Custom Source -&gt; Map -&gt; Source_Map -&gt;
Empty_Filer -&gt; Field_Filter -&gt; Type_Filter -&gt; Value_Filter -&gt; Map -&gt; Map -&gt;
Map -&gt; Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
Failed to send data to Kafka: The server disconnected before a response was
received.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:973)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:317)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:978)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotStat(AbstractStreamOperator.java:402)
... 18 more
Caused by: org.apache.kafka.common.errors.NetworkException: The server
disconnected before a response was received.
2020-06-10 12:04:47,913 INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1467)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1377)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:719)
at
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:807)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

期望收到各位的回复和帮助。
Best,
Zhefu
Reply | Threaded
Open this post in threaded view
|

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

zhefu
Hi Yichao,

感谢你的回复。因为这个任务已经上线大概一周了,今天才报出这个问题,我们后面会增大间隔并测试。同时,我在刚刚也有回复,我在TM也查到了一些相关日志:
2020-06-10 12:44:40,688 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
during disposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Pending record count must be zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Pending record count must be
zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
... 8 more

是不是还是和checkpoint的设立间隔过短有关呢?希望回复,感谢!

Best,
Zhefu

Yichao Yang <[hidden email]> 于2020年6月10日周三 下午1:24写道:

> Hi
>
>
>
> 看报错是checkpoint失败次数超过了最大限制导致任务失败。checkpoint间隔设置太小了,在我们团队通常都是分钟级别的interval,我们一般设置5分钟,checkpoint只是一个容错机制,没有特殊的需求场景不需要设置间隔那么短,并且频繁checkpoint会导致性能问题。
>
>
> Best,
> Yichao Yang
>
>
> ------------------ 原始邮件 ------------------
> 发件人: Zhefu PENG <[hidden email]&gt;
> 发送时间: 2020年6月10日 13:04
> 收件人: user-zh <[hidden email]&gt;
> 主题: 回复:flink任务checkpoint无法完成snapshot,且报kafka异常
>
>
>
> Hi all,
>
> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> Source: Custom Source -&gt; Map -&gt; Source_Map -&gt; Empty_Filer -&gt;
> Field_Filter
> -&gt; Type_Filter -&gt; Value_Filter -&gt; Map -&gt; Map -&gt; Map -&gt;
> Sink: Unnamed
>
>
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>
> 部分报错信息如下:
> 2020-06-10 12:02:49,083 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;&nbsp;
> - Triggering
> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> 2020-06-10 12:04:47,898 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;&nbsp;
> - Decline
> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> c41f4811262db1c4c270b136571c8201 at
> container_e27_1591466310139_21670_01_000006 @
> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> 2020-06-10 12:04:47,899 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;&nbsp;
> - Discarding
> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
> snapshot 1 for operator Source: Custom Source -&gt; Map -&gt; Source_Map
> -&gt;
> Empty_Filer -&gt; Field_Filter -&gt; Type_Filter -&gt; Value_Filter -&gt;
> Map -&gt; Map -&gt;
> Map -&gt; Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: The server disconnected before a response was
> received.
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:973)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
> at
>
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:317)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:978)
> at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at
>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotStat(AbstractStreamOperator.java:402)
> ... 18 more
> Caused by: org.apache.kafka.common.errors.NetworkException: The server
> disconnected before a response was received.
> 2020-06-10 12:04:47,913 INFO&nbsp;
> org.apache.flink.runtime.jobmaster.JobMaster
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> - Trying to recover from a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1467)
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1377)
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:719)
> at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:807)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> 期望收到各位的回复和帮助。
> Best,
> Zhefu