没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。
原因分析: 这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState taskExecutionState)这个rpc接口去通知flink jobmanager 去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了 rpc接口要求的最大数据大小(也就是maximum akka framesize),导致调用updateTaskExecutionState 这个rpc接口失败,jobmanager无法获知这个task已经fail 的状态,也无法重启。这就导致了一系列连锁反应,其中一个就是我的checkpoint一直失败,原因就是我的task其实已经释放了,但是jobmanger无法感知。 结论: 这个算不算flink的一个bug,对于task已经失效,但是无法通知到jobmanger,导致该task一直无法重启。 原始邮件 发件人:[hidden email] 收件人:[hidden email] 发送时间:2020年3月9日(周一) 11:06 主题:flink 长时间运行后出现报错 大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - Could not create remote rpc invocation message. Failing rpc invocation because... java.io.IOException: The rpc invocation size 34500577 exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129) at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78) at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at akka.actor.ActorCell.invoke(ActorCell.scala:496) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2020-03-08 06:10:30,480 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Caught exception while executing runnable in main thread. java.lang.reflect.UndeclaredThrowableException at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at akka.actor.ActorCell.invoke(ActorCell.scala:496) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: The rpc invocation size 34500577 exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129) at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78) ... 26 more |
Free forum by Nabble | Edit this page |