dear all:
我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。 详细如下: 1、部署方式: flink on yarn ,pre-job,每个container 1024 M jobmanager的jvmoption(默认的) -Xms424m -Xmx424m 2、数据情况: kafka数据,约1分钟1条,文本数据,每条数据都非常小。 3、任务情况: 很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。 消费采用 FlinkKafkaConsumer 写库采用 JDBCAppendTableSink 并行度 1 checkpoint 2分钟一次,每次checkpoint约100ms statebackend rocksdb 4、报错情况: 2020-07-10 11:51:54,237 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483. 2020-07-10 11:51:54,421 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77 ms). 2020-07-10 11:53:54,253 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483. 2020-07-10 11:53:54,457 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124 ms). 2020-07-10 11:55:54,246 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483. 2020-07-10 11:55:54,402 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115 ms). 2020-07-10 11:56:34,155 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread 'flink-akka.actor.default-dispatcher-4673' produced an uncaught exception. Stopping the process... java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672) at akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966) at akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905) at akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834) at akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30) at akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211) at akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211) at akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39) at akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115) at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55) at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142) at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136) at akka.actor.ActorCell.sendMessage(ActorCell.scala:429) at akka.actor.Cell.sendMessage(ActorCell.scala:350) at akka.actor.Cell.sendMessage$(ActorCell.scala:349) at akka.actor.ActorCell.sendMessage(ActorCell.scala:429) at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173) at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) thanks all / by nicygan |
Hi nicygan,
unable to create new native thread指的是无法创建checkpoint线程,并不是内存占用过大。 这种情况一般有3种可能的原因: 1.flink应用开启太多线程 2.机器上句柄设置太小 3.机器上的其他应用开启太多线程 建议排查一下机器上的ulimit设置(文件句柄会影响应用能开启的线程数),和flink metrics里监控到的线程数变化。 Best, shizk233 nicygan <[hidden email]> 于2020年7月14日周二 上午10:31写道: > dear all: > > 我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。 > 详细如下: > > > 1、部署方式: > flink on yarn ,pre-job,每个container 1024 M > jobmanager的jvmoption(默认的) -Xms424m -Xmx424m > > > 2、数据情况: > kafka数据,约1分钟1条,文本数据,每条数据都非常小。 > > > 3、任务情况: > 很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。 > 消费采用 FlinkKafkaConsumer > 写库采用 JDBCAppendTableSink > 并行度 1 > checkpoint 2分钟一次,每次checkpoint约100ms > statebackend rocksdb > > > 4、报错情况: > 2020-07-10 11:51:54,237 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483. > 2020-07-10 11:51:54,421 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77 > ms). > 2020-07-10 11:53:54,253 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483. > 2020-07-10 11:53:54,457 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124 > ms). > 2020-07-10 11:55:54,246 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483. > 2020-07-10 11:55:54,402 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115 > ms). > 2020-07-10 11:56:34,155 ERROR > org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: > Thread 'flink-akka.actor.default-dispatcher-4673' produced an uncaught > exception. Stopping the process... > java.lang.OutOfMemoryError: unable to create new native thread > at java.lang.Thread.start0(Native Method) > at java.lang.Thread.start(Thread.java:717) > at > akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672) > at > akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966) > at > akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905) > at > akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834) > at > akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30) > at > akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211) > at > akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211) > at > akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39) > at > akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115) > at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55) > at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142) > at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136) > at akka.actor.ActorCell.sendMessage(ActorCell.scala:429) > at akka.actor.Cell.sendMessage(ActorCell.scala:350) > at akka.actor.Cell.sendMessage$(ActorCell.scala:349) > at akka.actor.ActorCell.sendMessage(ActorCell.scala:429) > at > akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173) > at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > thanks all / by nicygan > |
Free forum by Nabble | Edit this page |