Flink batch 模式消费hdfs上的文件,并做了一个word count 操作,但是task一直运行,查看taskmanager的log,发现如下异常:
java.lang.reflect.UndeclaredThrowableException: null at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource) ~[?:?] at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.11.1.jar:1.11.1] Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79) ~[flink-dist_2.12-1.11.1.jar:1.11.1] ... 28 more 我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。 请求帮助。 |
如果使用了print()等算子,会将上一个task的结果一次全部pull过来,pull时数据超过了akka framesize大小导致。
李加燕 <[hidden email]> 于2020年9月28日周一 下午3:07写道: > Flink batch 模式消费hdfs上的文件,并做了一个word count > 操作,但是task一直运行,查看taskmanager的log,发现如下异常: > java.lang.reflect.UndeclaredThrowableException: null > at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource) > ~[?:?] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.Actor.aroundReceive(Actor.scala:517) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.12-1.11.1.jar:1.11.1] > Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds > the maximum akka framesize. > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > ... 28 more > 我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。 > 请求帮助。 |
Hi,
可以检查一下这个参数是否设置正确,也可以在jobmanager页面上看下是否有这个参数。我之前遇到过类似问题,设置这个参数可以解决问题。 Best, Faaron Zheng ________________________________ From: jy l <[hidden email]> Sent: Monday, September 28, 2020 4:57:46 PM To: [hidden email] <[hidden email]> Subject: Re: Flink Batch 模式下,The rpc invocation size 113602196 exceeds the maximum akka framesize 如果使用了print()等算子,会将上一个task的结果一次全部pull过来,pull时数据超过了akka framesize大小导致。 李加燕 <[hidden email]> 于2020年9月28日周一 下午3:07写道: > Flink batch 模式消费hdfs上的文件,并做了一个word count > 操作,但是task一直运行,查看taskmanager的log,发现如下异常: > java.lang.reflect.UndeclaredThrowableException: null > at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource) > ~[?:?] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.Actor.aroundReceive(Actor.scala:517) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.12-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.12-1.11.1.jar:1.11.1] > Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds > the maximum akka framesize. > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > ... 28 more > 我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。 > 请求帮助。 |
Free forum by Nabble | Edit this page |