FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

flink2021
flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右
有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的
 Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things:
 either job(s) require(s) a larger size of JVM direct memory or there is a
direct memory leak.
 The direct memory can be allocated by user code or some of its
dependencies. In this case 'taskmanager.memory.task.off-heap.size'
configuration option should be increased.
 Flink framework and its dependencies also consume the direct memory, mostly
for network communication.
 The most of network memory is managed by Flink and should not result in
out-of-memory error. In certain special cases,
 in particular for jobs with high parallelism, the framework may require
more direct memory which is not managed by Flink.
 In this case 'taskmanager.memory.framework.off-heap.size' configuration
option should be increased.
 If the error persists then there is probably a direct memory leak in user
code or some of its dependencies which has to be investigated and fixed.
 The task executor has to be shutdown...
        at java.nio.Bits.reserveMemory(Bits.java:695)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
        at sun.nio.ch.IOUtil.read(IOUtil.java:195)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
        at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

回复:FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

Shuai Xia
 能看下webUI上的内存使用情况么,看下这三个内存的使用情况
再检查下代码是不是用了Native方法
Framework Off-Heap默认128M
Task Off-Heap默认0
Network默认TM Total的0.1


------------------------------------------------------------------
发件人:flink2021 <[hidden email]>
发送时间:2021年2月18日(星期四) 16:36
收件人:user-zh <[hidden email]>
主 题:FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右
有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的
 Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things:
 either job(s) require(s) a larger size of JVM direct memory or there is a
direct memory leak.
 The direct memory can be allocated by user code or some of its
dependencies. In this case 'taskmanager.memory.task.off-heap.size'
configuration option should be increased.
 Flink framework and its dependencies also consume the direct memory, mostly
for network communication.
 The most of network memory is managed by Flink and should not result in
out-of-memory error. In certain special cases,
 in particular for jobs with high parallelism, the framework may require
more direct memory which is not managed by Flink.
 In this case 'taskmanager.memory.framework.off-heap.size' configuration
option should be increased.
 If the error persists then there is probably a direct memory leak in user
code or some of its dependencies which has to be investigated and fixed.
 The task executor has to be shutdown...
 at java.nio.Bits.reserveMemory(Bits.java:695)
 at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
 at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
 at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
 at sun.nio.ch.IOUtil.read(IOUtil.java:195)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
 at
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

Luan Cooper
In reply to this post by flink2021
taskmanager.memory.framework.off-heap.size 默认是一个固定值(256MB 以下),不是按百分比算的
OOM 应该是下游反压导致
建议是直接增加 taskmanager.memory.framework.off-heap.size
On Feb 18, 2021, 16:24 +0800, flink2021 <[hidden email]>, wrote:

> flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右
> 有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things:
> either job(s) require(s) a larger size of JVM direct memory or there is a
> direct memory leak.
> The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased.
> Flink framework and its dependencies also consume the direct memory, mostly
> for network communication.
> The most of network memory is managed by Flink and should not result in
> out-of-memory error. In certain special cases,
> in particular for jobs with high parallelism, the framework may require
> more direct memory which is not managed by Flink.
> In this case 'taskmanager.memory.framework.off-heap.size' configuration
> option should be increased.
> If the error persists then there is probably a direct memory leak in user
> code or some of its dependencies which has to be investigated and fixed.
> The task executor has to be shutdown...
> at java.nio.Bits.reserveMemory(Bits.java:695)
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

flink2021
嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢?
JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC
-XX:MaxDirectMemorySize=8192m"
其它也就是写常规的配置:
og.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#broker能接收消息的最大字节数
message.max.bytes=200000000
#broker可复制的消息的最大字节数
replica.fetch.max.bytes=204857600
#消费者端的可读取的最大消息
fetch.message.max.bytes=204857600
max.poll.records=500




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

Qishang Zhong
Hi .
我也遇到了这个问题, 最后怎么解决的? 版本 Flink 1.12.1 .

flink2021 <[hidden email]> 于2021年2月19日周五 下午12:39写道:

> 嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢?
> JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC
> -XX:MaxDirectMemorySize=8192m"
> 其它也就是写常规的配置:
> og.segment.bytes=1073741824
> log.retention.check.interval.ms=300000
> #broker能接收消息的最大字节数
> message.max.bytes=200000000
> #broker可复制的消息的最大字节数
> replica.fetch.max.bytes=204857600
> #消费者端的可读取的最大消息
> fetch.message.max.bytes=204857600
> max.poll.records=500
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

flink2021
 设置taskmanager.memory.framework.off-heap.size 512MB  使用rocksdb存储,现在很稳定



--
Sent from: http://apache-flink.147419.n8.nabble.com/