Hi
看报错是checkpoint失败次数超过了最大限制导致任务失败。checkpoint间隔设置太小了,在我们团队通常都是分钟级别的interval,我们一般设置5分钟,checkpoint只是一个容错机制,没有特殊的需求场景不需要设置间隔那么短,并且频繁checkpoint会导致性能问题。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: Zhefu PENG <[hidden email]> 发送时间: 2020年6月10日 13:04 收件人: user-zh <[hidden email]> 主题: 回复:flink任务checkpoint无法完成snapshot,且报kafka异常 Hi all, 现在有一个简单的flink任务,大概chain在一起后的执行图为: Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。 部分报错信息如下: 2020-06-10 12:02:49,083 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201. 2020-06-10 12:04:47,898 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job c41f4811262db1c4c270b136571c8201 at container_e27_1591466310139_21670_01_000006 @ hdp1-hadoop-datanode-4.novalocal (dataPort=44778). 2020-06-10 12:04:47,899 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job c41f4811262db1c4c270b136571c8201. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: The server disconnected before a response was received. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:973) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:317) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:978) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotStat(AbstractStreamOperator.java:402) ... 18 more Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 2020-06-10 12:04:47,913 INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1467) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1377) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:719) at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:807) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 期望收到各位的回复和帮助。 Best, Zhefu |
Hi Yichao,
感谢你的回复。因为这个任务已经上线大概一周了,今天才报出这个问题,我们后面会增大间隔并测试。同时,我在刚刚也有回复,我在TM也查到了一些相关日志: 2020-06-10 12:44:40,688 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Pending record count must be zero at this point: 5 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Pending record count must be zero at this point: 5 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834) ... 8 more 是不是还是和checkpoint的设立间隔过短有关呢?希望回复,感谢! Best, Zhefu Yichao Yang <[hidden email]> 于2020年6月10日周三 下午1:24写道: > Hi > > > > 看报错是checkpoint失败次数超过了最大限制导致任务失败。checkpoint间隔设置太小了,在我们团队通常都是分钟级别的interval,我们一般设置5分钟,checkpoint只是一个容错机制,没有特殊的需求场景不需要设置间隔那么短,并且频繁checkpoint会导致性能问题。 > > > Best, > Yichao Yang > > > ------------------ 原始邮件 ------------------ > 发件人: Zhefu PENG <[hidden email]> > 发送时间: 2020年6月10日 13:04 > 收件人: user-zh <[hidden email]> > 主题: 回复:flink任务checkpoint无法完成snapshot,且报kafka异常 > > > > Hi all, > > 现在有一个简单的flink任务,大概chain在一起后的执行图为: > Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> > Field_Filter > -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> > Sink: Unnamed > > > 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。 > > 部分报错信息如下: > 2020-06-10 12:02:49,083 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering > checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201. > 2020-06-10 12:04:47,898 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Decline > checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job > c41f4811262db1c4c270b136571c8201 at > container_e27_1591466310139_21670_01_000006 @ > hdp1-hadoop-datanode-4.novalocal (dataPort=44778). > 2020-06-10 12:04:47,899 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Discarding > checkpoint 1 of job c41f4811262db1c4c270b136571c8201. > org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete > snapshot 1 for operator Source: Custom Source -> Map -> Source_Map > -> > Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> > Map -> Map -> > Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined. > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: > Failed to send data to Kafka: The server disconnected before a response was > received. > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:973) > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) > at > > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:317) > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:978) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotStat(AbstractStreamOperator.java:402) > ... 18 more > Caused by: org.apache.kafka.common.errors.NetworkException: The server > disconnected before a response was received. > 2020-06-10 12:04:47,913 INFO > org.apache.flink.runtime.jobmaster.JobMaster > > - Trying to recover from a global failure. > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable > failure threshold. > at > > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) > at > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1467) > at > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1377) > at > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:719) > at > > org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:807) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > 期望收到各位的回复和帮助。 > Best, > Zhefu |
Free forum by Nabble | Edit this page |