我用Flink SQL在一个statement里塞了60个insert into,Source都是从同一个upsert-kafka来的,写出到kudu表,但是发现报这个错误,看起来像是kudu的client申请了netty的direct memory,但是由于sink的表太多,报申请的direct memory超出了flink的task-off memory中的direct memory,能不能大致说一下调整哪个参数,是taskmanager.memory.task.off-heap.size么?以及如何调整出一个相对合适的新值?
2021-07-14 13:35:53 java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown... at java.nio.Bits.reserveMemory(Bits.java:694) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.SocketSendBufferPool$Preallocation.<init>(SocketSendBufferPool.java:156) at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.SocketSendBufferPool.<init>(SocketSendBufferPool.java:42) at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:45) at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45) at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool.newWorker(NioWorkerPool.java:44) at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool.newWorker(NioWorkerPool.java:28) at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:80) at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39) at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:33) at org.apache.kudu.client.AsyncKuduClient$AsyncKuduClientBuilder.createChannelFactory(AsyncKuduClient.java:2740) at org.apache.kudu.client.AsyncKuduClient$AsyncKuduClientBuilder.access$000(AsyncKuduClient.java:2589) at org.apache.kudu.client.AsyncKuduClient.<init>(AsyncKuduClient.java:367) at org.apache.kudu.client.AsyncKuduClient.<init>(AsyncKuduClient.java:261) at org.apache.kudu.client.AsyncKuduClient$AsyncKuduClientBuilder.build(AsyncKuduClient.java:2762) at org.apache.kudu.client.KuduClient$KuduClientBuilder.build(KuduClient.java:543) at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.obtainClient(KuduWriter.java:76) at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.<init>(KuduWriter.java:66) at org.colloh.flink.kudu.connector.table.sink.KuduSink.open(KuduSink.java:87) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) |
Free forum by Nabble | Edit this page |