flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右
有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的 Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown... at java.nio.Bits.reserveMemory(Bits.java:695) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
能看下webUI上的内存使用情况么,看下这三个内存的使用情况
再检查下代码是不是用了Native方法 Framework Off-Heap默认128M Task Off-Heap默认0 Network默认TM Total的0.1 ------------------------------------------------------------------ 发件人:flink2021 <[hidden email]> 发送时间:2021年2月18日(星期四) 16:36 收件人:user-zh <[hidden email]> 主 题:FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右 有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的 Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown... at java.nio.Bits.reserveMemory(Bits.java:695) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by flink2021
taskmanager.memory.framework.off-heap.size 默认是一个固定值(256MB 以下),不是按百分比算的
OOM 应该是下游反压导致 建议是直接增加 taskmanager.memory.framework.off-heap.size On Feb 18, 2021, 16:24 +0800, flink2021 <[hidden email]>, wrote: > flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右 > 有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的 > Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct > out-of-memory error has occurred. This can mean two things: > either job(s) require(s) a larger size of JVM direct memory or there is a > direct memory leak. > The direct memory can be allocated by user code or some of its > dependencies. In this case 'taskmanager.memory.task.off-heap.size' > configuration option should be increased. > Flink framework and its dependencies also consume the direct memory, mostly > for network communication. > The most of network memory is managed by Flink and should not result in > out-of-memory error. In certain special cases, > in particular for jobs with high parallelism, the framework may require > more direct memory which is not managed by Flink. > In this case 'taskmanager.memory.framework.off-heap.size' configuration > option should be increased. > If the error persists then there is probably a direct memory leak in user > code or some of its dependencies which has to be investigated and fixed. > The task executor has to be shutdown... > at java.nio.Bits.reserveMemory(Bits.java:695) > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) > at sun.nio.ch.IOUtil.read(IOUtil.java:195) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483) > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300) > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢?
JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC -XX:MaxDirectMemorySize=8192m" 其它也就是写常规的配置: og.segment.bytes=1073741824 log.retention.check.interval.ms=300000 #broker能接收消息的最大字节数 message.max.bytes=200000000 #broker可复制的消息的最大字节数 replica.fetch.max.bytes=204857600 #消费者端的可读取的最大消息 fetch.message.max.bytes=204857600 max.poll.records=500 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi .
我也遇到了这个问题, 最后怎么解决的? 版本 Flink 1.12.1 . flink2021 <[hidden email]> 于2021年2月19日周五 下午12:39写道: > 嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢? > JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC > -XX:MaxDirectMemorySize=8192m" > 其它也就是写常规的配置: > og.segment.bytes=1073741824 > log.retention.check.interval.ms=300000 > #broker能接收消息的最大字节数 > message.max.bytes=200000000 > #broker可复制的消息的最大字节数 > replica.fetch.max.bytes=204857600 > #消费者端的可读取的最大消息 > fetch.message.max.bytes=204857600 > max.poll.records=500 > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
设置taskmanager.memory.framework.off-heap.size 512MB 使用rocksdb存储,现在很稳定
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |