flink消费kafka出现GC问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink消费kafka出现GC问题

nick
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_271]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_271]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_271]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_271]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_271]
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_271]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
~[?:?]
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
~[?:?]
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
~[?:?]

这个任务跑了10几个小时之后就出现这种情况了,这是kafka还是flink的问题?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:flink消费kafka出现GC问题

Ye Chen
@nick  我们之前也遇到的过GC导致任务挂掉问题,后来排查发现flink代码写的有问题,在@close方法中关闭数据库连接,但是事实上@close方法未起作用,导致资源未释放OOM。

hope this can help you














At 2021-01-22 14:18:51, "nick" <[hidden email]> wrote:

>org.apache.flink.runtime.JobException: Recovery is suppressed by
>NoRestartBackoffTimeStrategy
> at
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>~[?:1.8.0_271]
> at
>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>~[?:1.8.0_271]
> at
>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>~[?:1.8.0_271]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_271]
> at
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_271]
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_271]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
>~[?:?]
> at
>org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
>~[?:?]
> at
>org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
>~[?:?]
>
>这个任务跑了10几个小时之后就出现这种情况了,这是kafka还是flink的问题?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re:flink消费kafka出现GC问题

nick
我没有写代码,我全是flink sql写的业务逻辑



--
Sent from: http://apache-flink.147419.n8.nabble.com/