大家好:
我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - Could not create remote rpc invocation message. Failing rpc invocation because... java.io.IOException: The rpc invocation size 34500577 exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129) at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78) at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at akka.actor.ActorCell.invoke(ActorCell.scala:496) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2020-03-08 06:10:30,480 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Caught exception while executing runnable in main thread. java.lang.reflect.UndeclaredThrowableException at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at akka.actor.ActorCell.invoke(ActorCell.scala:496) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: The rpc invocation size 34500577 exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129) at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78) ... 26 more |
我帮你 cc 了对 runtime 更熟悉的 zhuzhu 同学。
Best, Kurt On Mon, Mar 9, 2020 at 6:44 PM lucas.wu <[hidden email]> wrote: > 没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。 > 原因分析: > 这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState > taskExecutionState)这个rpc接口去通知flink jobmanager > > 去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了 > rpc接口要求的最大数据大小(也就是maximum akka framesize),导致调用updateTaskExecutionState > 这个rpc接口失败,jobmanager无法获知这个task已经fail > > 的状态,也无法重启。这就导致了一系列连锁反应,其中一个就是我的checkpoint一直失败,原因就是我的task其实已经释放了,但是jobmanger无法感知。 > > 结论: > 这个算不算flink的一个bug,对于task已经失效,但是无法通知到jobmanger,导致该task一直无法重启。 > 原始邮件 > 发件人:[hidden email] > 收件人:[hidden email] > 发送时间:2020年3月9日(周一) 11:06 > 主题:flink 长时间运行后出现报错 > > > 大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc > outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08 > 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - > Could not create remote rpc invocation message. Failing rpc invocation > because... java.io.IOException: The rpc invocation size 34500577 exceeds > the maximum akka framesize. at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78) > at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at > org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at > akka.actor.ActorCell.invoke(ActorCell.scala:496) at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at > akka.dispatch.Mailbox.run(Mailbox.scala:224) at > akka.dispatch.Mailbox.exec(Mailbox.scala:234) at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2020-03-08 06:10:30,480 ERROR > org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Caught exception while > executing runnable in main thread. > java.lang.reflect.UndeclaredThrowableException at > com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at > org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at > akka.actor.ActorCell.invoke(ActorCell.scala:496) at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at > akka.dispatch.Mailbox.run(Mailbox.scala:224) at > akka.dispatch.Mailbox.exec(Mailbox.scala:234) at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.io.IOException: The rpc invocation size 34500577 exceeds > the maximum akka framesize. at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78) > ... 26 more |
Free forum by Nabble | Edit this page |