flink1.9.1-消费kafka落pg库任务出错

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.9.1-消费kafka落pg库任务出错

nicygan
dear all:
      我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。
      详细如下:


1、部署方式:
flink on yarn ,pre-job,每个container 1024 M
jobmanager的jvmoption(默认的)  -Xms424m    -Xmx424m


2、数据情况:
kafka数据,约1分钟1条,文本数据,每条数据都非常小。


3、任务情况:
很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。
消费采用 FlinkKafkaConsumer
写库采用 JDBCAppendTableSink
并行度 1
checkpoint 2分钟一次,每次checkpoint约100ms
statebackend rocksdb


4、报错情况:
2020-07-10 11:51:54,237 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:51:54,421 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77 ms).
2020-07-10 11:53:54,253 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:53:54,457 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124 ms).
2020-07-10 11:55:54,246 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:55:54,402 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115 ms).
2020-07-10 11:56:34,155 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler       - FATAL: Thread 'flink-akka.actor.default-dispatcher-4673' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:717)
        at akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
        at akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
        at akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905)
        at akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
        at akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30)
        at akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211)
        at akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211)
        at akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39)
        at akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115)
        at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55)
        at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142)
        at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136)
        at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
        at akka.actor.Cell.sendMessage(ActorCell.scala:350)
        at akka.actor.Cell.sendMessage$(ActorCell.scala:349)
        at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
        at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173)
        at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

thanks all / by nicygan
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9.1-消费kafka落pg库任务出错

shizk233
Hi nicygan,

unable to create new native thread指的是无法创建checkpoint线程,并不是内存占用过大。
这种情况一般有3种可能的原因:
1.flink应用开启太多线程
2.机器上句柄设置太小
3.机器上的其他应用开启太多线程

建议排查一下机器上的ulimit设置(文件句柄会影响应用能开启的线程数),和flink metrics里监控到的线程数变化。

Best,
shizk233



nicygan <[hidden email]> 于2020年7月14日周二 上午10:31写道:

> dear all:
>
> 我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。
>       详细如下:
>
>
> 1、部署方式:
> flink on yarn ,pre-job,每个container 1024 M
> jobmanager的jvmoption(默认的)  -Xms424m    -Xmx424m
>
>
> 2、数据情况:
> kafka数据,约1分钟1条,文本数据,每条数据都非常小。
>
>
> 3、任务情况:
> 很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。
> 消费采用 FlinkKafkaConsumer
> 写库采用 JDBCAppendTableSink
> 并行度 1
> checkpoint 2分钟一次,每次checkpoint约100ms
> statebackend rocksdb
>
>
> 4、报错情况:
> 2020-07-10 11:51:54,237 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483.
> 2020-07-10 11:51:54,421 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
> checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77
> ms).
> 2020-07-10 11:53:54,253 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483.
> 2020-07-10 11:53:54,457 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
> checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124
> ms).
> 2020-07-10 11:55:54,246 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483.
> 2020-07-10 11:55:54,402 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
> checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115
> ms).
> 2020-07-10 11:56:34,155 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler       - FATAL:
> Thread 'flink-akka.actor.default-dispatcher-4673' produced an uncaught
> exception. Stopping the process...
> java.lang.OutOfMemoryError: unable to create new native thread
>         at java.lang.Thread.start0(Native Method)
>         at java.lang.Thread.start(Thread.java:717)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30)
>         at
> akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211)
>         at
> akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211)
>         at
> akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39)
>         at
> akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115)
>         at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55)
>         at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142)
>         at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136)
>         at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
>         at akka.actor.Cell.sendMessage(ActorCell.scala:350)
>         at akka.actor.Cell.sendMessage$(ActorCell.scala:349)
>         at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
>         at
> akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173)
>         at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> thanks all / by nicygan
>