你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下;
1,关于使用rocksdb 我的设置是在代码中指定如下 env.setStateBackend(new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true).getCheckpointBackend()); 但是我在jobmanager启动日志中看到如下info Using application-defined state backend: File State Backend (checkpoints: 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1) 这是不是意味着rocksdb设置失败? 我任务从检查点恢复是日志显示:感觉也是fsstatebackend? java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84) at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41) at java.io.DataInputStream.readInt(DataInputStream.java:389) at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129) at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown Source) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown Source) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-akka.actor.default-dispatcher-20] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat 所以我不知道 backend是否正确指定 2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g 每个taskmanager内存8G, jobmanager.heap.size: 2048m taskmanager.memory.process.size: 8192m taskmanager.memory.managed.size: 2048m 希望给与帮助, [hidden email] |
这是内存分析的截图
https://blog.csdn.net/xiaosannimei/article/details/106259140 [hidden email] 发件人: [hidden email] 发送时间: 2020-05-21 15:10 收件人: user-zh 主题: 使用rocksdb backend 内存溢出的问题疑问 你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下; 1,关于使用rocksdb 我的设置是在代码中指定如下 env.setStateBackend(new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true).getCheckpointBackend()); 但是我在jobmanager启动日志中看到如下info Using application-defined state backend: File State Backend (checkpoints: 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1) 这是不是意味着rocksdb设置失败? 我任务从检查点恢复是日志显示:感觉也是fsstatebackend? java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84) at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41) at java.io.DataInputStream.readInt(DataInputStream.java:389) at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129) at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown Source) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown Source) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-akka.actor.default-dispatcher-20] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat 所以我不知道 backend是否正确指定 2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g 每个taskmanager内存8G, jobmanager.heap.size: 2048m taskmanager.memory.process.size: 8192m taskmanager.memory.managed.size: 2048m 希望给与帮助, [hidden email] |
Hi
从错误栈看,使用的是 HeapStateBackend,所以肯定不是 RocksDBStateBackend 你把代码改成 `env.setStateBackend(new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true));` 再尝试一下看看 Best, Congxian [hidden email] <[hidden email]> 于2020年5月21日周四 下午3:38写道: > 这是内存分析的截图 > https://blog.csdn.net/xiaosannimei/article/details/106259140 > > > > [hidden email] > > 发件人: [hidden email] > 发送时间: 2020-05-21 15:10 > 收件人: user-zh > 主题: 使用rocksdb backend 内存溢出的问题疑问 > 你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下; > > 1,关于使用rocksdb 我的设置是在代码中指定如下 > env.setStateBackend(new > RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , > true).getCheckpointBackend()); > > 但是我在jobmanager启动日志中看到如下info > Using application-defined state backend: File State Backend (checkpoints: > 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null', > asynchronous: UNDEFINED, fileStateThreshold: -1) > 这是不是意味着rocksdb设置失败? > > 我任务从检查点恢复是日志显示:感觉也是fsstatebackend? > > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696) > at java.io.FilterInputStream.read(FilterInputStream.java:83) > at > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84) > at > org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) > at > org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41) > at java.io.DataInputStream.readInt(DataInputStream.java:389) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42) > at > org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151) > at > org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown > Source) > at > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293) > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254) > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown > Source) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > [flink-akka.actor.default-dispatcher-20] INFO > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat > > 所以我不知道 backend是否正确指定 > > 2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g > 每个taskmanager内存8G, > jobmanager.heap.size: 2048m > taskmanager.memory.process.size: 8192m > taskmanager.memory.managed.size: 2048m > 希望给与帮助, > > > > [hidden email] > |
我设置的是true 啊
env.setStateBackend( new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true) .getCheckpointBackend() ); [hidden email] 发件人: Congxian Qiu 发送时间: 2020-05-21 15:58 收件人: user-zh 主题: Re: 使用rocksdb backend 内存溢出的问题疑问 Hi 从错误栈看,使用的是 HeapStateBackend,所以肯定不是 RocksDBStateBackend 你把代码改成 `env.setStateBackend(new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true));` 再尝试一下看看 Best, Congxian [hidden email] <[hidden email]> 于2020年5月21日周四 下午3:38写道: > 这是内存分析的截图 > https://blog.csdn.net/xiaosannimei/article/details/106259140 > > > > [hidden email] > > 发件人: [hidden email] > 发送时间: 2020-05-21 15:10 > 收件人: user-zh > 主题: 使用rocksdb backend 内存溢出的问题疑问 > 你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下; > > 1,关于使用rocksdb 我的设置是在代码中指定如下 > env.setStateBackend(new > RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , > true).getCheckpointBackend()); > > 但是我在jobmanager启动日志中看到如下info > Using application-defined state backend: File State Backend (checkpoints: > 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null', > asynchronous: UNDEFINED, fileStateThreshold: -1) > 这是不是意味着rocksdb设置失败? > > 我任务从检查点恢复是日志显示:感觉也是fsstatebackend? > > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696) > at java.io.FilterInputStream.read(FilterInputStream.java:83) > at > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84) > at > org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) > at > org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41) > at java.io.DataInputStream.readInt(DataInputStream.java:389) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42) > at > org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151) > at > org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown > Source) > at > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293) > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254) > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown > Source) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > [flink-akka.actor.default-dispatcher-20] INFO > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat > > 所以我不知道 backend是否正确指定 > > 2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g > 每个taskmanager内存8G, > jobmanager.heap.size: 2048m > taskmanager.memory.process.size: 8192m > taskmanager.memory.managed.size: 2048m > 希望给与帮助, > > > > [hidden email] > |
hi
你需要去掉 `getCheckpointBackend()` Best, Congxian [hidden email] <[hidden email]> 于2020年5月21日周四 下午4:24写道: > 我设置的是true 啊 > > env.setStateBackend( > new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , > true) > .getCheckpointBackend() > ); > > > [hidden email] > > 发件人: Congxian Qiu > 发送时间: 2020-05-21 15:58 > 收件人: user-zh > 主题: Re: 使用rocksdb backend 内存溢出的问题疑问 > Hi > 从错误栈看,使用的是 HeapStateBackend,所以肯定不是 RocksDBStateBackend > > 你把代码改成 `env.setStateBackend(new > RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true));` > 再尝试一下看看 > > Best, > Congxian > > > [hidden email] <[hidden email]> 于2020年5月21日周四 下午3:38写道: > > > 这是内存分析的截图 > > https://blog.csdn.net/xiaosannimei/article/details/106259140 > > > > > > > > [hidden email] > > > > 发件人: [hidden email] > > 发送时间: 2020-05-21 15:10 > > 收件人: user-zh > > 主题: 使用rocksdb backend 内存溢出的问题疑问 > > 你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下; > > > > 1,关于使用rocksdb 我的设置是在代码中指定如下 > > env.setStateBackend(new > > RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , > > true).getCheckpointBackend()); > > > > 但是我在jobmanager启动日志中看到如下info > > Using application-defined state backend: File State Backend (checkpoints: > > 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null', > > asynchronous: UNDEFINED, fileStateThreshold: -1) > > 这是不是意味着rocksdb设置失败? > > > > 我任务从检查点恢复是日志显示:感觉也是fsstatebackend? > > > > java.lang.OutOfMemoryError: GC overhead limit exceeded > > at > > > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831) > > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889) > > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696) > > at java.io.FilterInputStream.read(FilterInputStream.java:83) > > at > > > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84) > > at > > > org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) > > at > > > org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41) > > at java.io.DataInputStream.readInt(DataInputStream.java:389) > > at > > > org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129) > > at > > > org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42) > > at > > > org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151) > > at > > > org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) > > at > > > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) > > at > > > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown > > Source) > > at > > > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > > at > > > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293) > > at > > > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254) > > at > > > org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153) > > at > > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) > > at > > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown > > Source) > > at > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > > at > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > > at > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown > > Source) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > > [flink-akka.actor.default-dispatcher-20] INFO > > > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat > > > > 所以我不知道 backend是否正确指定 > > > > 2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g > > 每个taskmanager内存8G, > > jobmanager.heap.size: 2048m > > taskmanager.memory.process.size: 8192m > > taskmanager.memory.managed.size: 2048m > > 希望给与帮助, > > > > > > > > [hidden email] > > > |
Free forum by Nabble | Edit this page |