使用rocksdb backend 内存溢出的问题疑问

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

使用rocksdb backend 内存溢出的问题疑问

1101300123
你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下;

1,关于使用rocksdb 我的设置是在代码中指定如下
 env.setStateBackend(new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true).getCheckpointBackend());

但是我在jobmanager启动日志中看到如下info
Using application-defined state backend: File State Backend (checkpoints: 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1)
这是不是意味着rocksdb设置失败?

我任务从检查点恢复是日志显示:感觉也是fsstatebackend?

java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
at java.io.DataInputStream.readInt(DataInputStream.java:389)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151)
at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown Source)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown Source)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
[flink-akka.actor.default-dispatcher-20] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat

所以我不知道 backend是否正确指定

2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g
每个taskmanager内存8G,
jobmanager.heap.size: 2048m
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.size: 2048m
希望给与帮助,



[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: 使用rocksdb backend 内存溢出的问题疑问

1101300123
这是内存分析的截图
https://blog.csdn.net/xiaosannimei/article/details/106259140



[hidden email]
 
发件人: [hidden email]
发送时间: 2020-05-21 15:10
收件人: user-zh
主题: 使用rocksdb backend 内存溢出的问题疑问
你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下;
 
1,关于使用rocksdb 我的设置是在代码中指定如下
env.setStateBackend(new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true).getCheckpointBackend());
 
但是我在jobmanager启动日志中看到如下info
Using application-defined state backend: File State Backend (checkpoints: 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1)
这是不是意味着rocksdb设置失败?
 
我任务从检查点恢复是日志显示:感觉也是fsstatebackend?
 
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
at java.io.DataInputStream.readInt(DataInputStream.java:389)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151)
at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown Source)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown Source)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
[flink-akka.actor.default-dispatcher-20] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat
 
所以我不知道 backend是否正确指定
 
2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g
每个taskmanager内存8G,
jobmanager.heap.size: 2048m
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.size: 2048m
希望给与帮助,
 
 
 
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 使用rocksdb backend 内存溢出的问题疑问

Congxian Qiu
Hi
从错误栈看,使用的是 HeapStateBackend,所以肯定不是 RocksDBStateBackend

你把代码改成 `env.setStateBackend(new
RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true));`
再尝试一下看看

Best,
Congxian


[hidden email] <[hidden email]> 于2020年5月21日周四 下午3:38写道:

> 这是内存分析的截图
> https://blog.csdn.net/xiaosannimei/article/details/106259140
>
>
>
> [hidden email]
>
> 发件人: [hidden email]
> 发送时间: 2020-05-21 15:10
> 收件人: user-zh
> 主题: 使用rocksdb backend 内存溢出的问题疑问
> 你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下;
>
> 1,关于使用rocksdb 我的设置是在代码中指定如下
> env.setStateBackend(new
> RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" ,
> true).getCheckpointBackend());
>
> 但是我在jobmanager启动日志中看到如下info
> Using application-defined state backend: File State Backend (checkpoints:
> 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null',
> asynchronous: UNDEFINED, fileStateThreshold: -1)
> 这是不是意味着rocksdb设置失败?
>
> 我任务从检查点恢复是日志显示:感觉也是fsstatebackend?
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696)
> at java.io.FilterInputStream.read(FilterInputStream.java:83)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
> at
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
> at
> org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
> at java.io.DataInputStream.readInt(DataInputStream.java:389)
> at
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129)
> at
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown
> Source)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown
> Source)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown
> Source)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> [flink-akka.actor.default-dispatcher-20] INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat
>
> 所以我不知道 backend是否正确指定
>
> 2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g
> 每个taskmanager内存8G,
> jobmanager.heap.size: 2048m
> taskmanager.memory.process.size: 8192m
> taskmanager.memory.managed.size: 2048m
> 希望给与帮助,
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: 使用rocksdb backend 内存溢出的问题疑问

1101300123
我设置的是true 啊

env.setStateBackend(
new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true)
.getCheckpointBackend()
);


[hidden email]
 
发件人: Congxian Qiu
发送时间: 2020-05-21 15:58
收件人: user-zh
主题: Re: 使用rocksdb backend 内存溢出的问题疑问
Hi
从错误栈看,使用的是 HeapStateBackend,所以肯定不是 RocksDBStateBackend
 
你把代码改成 `env.setStateBackend(new
RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true));`
再尝试一下看看
 
Best,
Congxian
 
 
[hidden email] <[hidden email]> 于2020年5月21日周四 下午3:38写道:
 

> 这是内存分析的截图
> https://blog.csdn.net/xiaosannimei/article/details/106259140
>
>
>
> [hidden email]
>
> 发件人: [hidden email]
> 发送时间: 2020-05-21 15:10
> 收件人: user-zh
> 主题: 使用rocksdb backend 内存溢出的问题疑问
> 你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下;
>
> 1,关于使用rocksdb 我的设置是在代码中指定如下
> env.setStateBackend(new
> RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" ,
> true).getCheckpointBackend());
>
> 但是我在jobmanager启动日志中看到如下info
> Using application-defined state backend: File State Backend (checkpoints:
> 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null',
> asynchronous: UNDEFINED, fileStateThreshold: -1)
> 这是不是意味着rocksdb设置失败?
>
> 我任务从检查点恢复是日志显示:感觉也是fsstatebackend?
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696)
> at java.io.FilterInputStream.read(FilterInputStream.java:83)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
> at
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
> at
> org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
> at java.io.DataInputStream.readInt(DataInputStream.java:389)
> at
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129)
> at
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown
> Source)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown
> Source)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown
> Source)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> [flink-akka.actor.default-dispatcher-20] INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat
>
> 所以我不知道 backend是否正确指定
>
> 2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g
> 每个taskmanager内存8G,
> jobmanager.heap.size: 2048m
> taskmanager.memory.process.size: 8192m
> taskmanager.memory.managed.size: 2048m
> 希望给与帮助,
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: 使用rocksdb backend 内存溢出的问题疑问

Congxian Qiu
hi
你需要去掉 `getCheckpointBackend()`
Best,
Congxian


[hidden email] <[hidden email]> 于2020年5月21日周四 下午4:24写道:

> 我设置的是true 啊
>
> env.setStateBackend(
> new RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" ,
> true)
> .getCheckpointBackend()
> );
>
>
> [hidden email]
>
> 发件人: Congxian Qiu
> 发送时间: 2020-05-21 15:58
> 收件人: user-zh
> 主题: Re: 使用rocksdb backend 内存溢出的问题疑问
> Hi
> 从错误栈看,使用的是 HeapStateBackend,所以肯定不是 RocksDBStateBackend
>
> 你把代码改成 `env.setStateBackend(new
> RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" , true));`
> 再尝试一下看看
>
> Best,
> Congxian
>
>
> [hidden email] <[hidden email]> 于2020年5月21日周四 下午3:38写道:
>
> > 这是内存分析的截图
> > https://blog.csdn.net/xiaosannimei/article/details/106259140
> >
> >
> >
> > [hidden email]
> >
> > 发件人: [hidden email]
> > 发送时间: 2020-05-21 15:10
> > 收件人: user-zh
> > 主题: 使用rocksdb backend 内存溢出的问题疑问
> > 你好我自使用flink 1.10版本的flinkSQL 做UNbound流多表join时 遇到了内存溢出的问题?有几个疑问想咨询下;
> >
> > 1,关于使用rocksdb 我的设置是在代码中指定如下
> > env.setStateBackend(new
> > RocksDBStateBackend("hdfs://beh/user/dc_cbss/flink5/checkpoints" ,
> > true).getCheckpointBackend());
> >
> > 但是我在jobmanager启动日志中看到如下info
> > Using application-defined state backend: File State Backend (checkpoints:
> > 'hdfs://beh/user/dc_cbss/flink5/checkpoints', savepoints: 'null',
> > asynchronous: UNDEFINED, fileStateThreshold: -1)
> > 这是不是意味着rocksdb设置失败?
> >
> > 我任务从检查点恢复是日志显示:感觉也是fsstatebackend?
> >
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> > at
> >
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:831)
> > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889)
> > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:696)
> > at java.io.FilterInputStream.read(FilterInputStream.java:83)
> > at
> >
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
> > at
> >
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
> > at
> >
> org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
> > at java.io.DataInputStream.readInt(DataInputStream.java:389)
> > at
> >
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:129)
> > at
> >
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
> > at
> >
> org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151)
> > at
> >
> org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
> > at
> >
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> > at
> >
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$$Lambda$163/413838894.readElement(Unknown
> > Source)
> > at
> >
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> > at
> >
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
> > at
> >
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
> > at
> >
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
> > at
> >
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
> > at
> >
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$$Lambda$138/1902710867.apply(Unknown
> > Source)
> > at
> >
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> > at
> >
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$132/1758917910.run(Unknown
> > Source)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> > [flink-akka.actor.default-dispatcher-20] INFO
> >
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrat
> >
> > 所以我不知道 backend是否正确指定
> >
> > 2.我的任务设置过期时间为24小时,当状态超过1g后会出现内存溢出,日志如上;就是gc时间太频繁;想知道实际生产大家是怎么做的;我给的内存远超1g
> > 每个taskmanager内存8G,
> > jobmanager.heap.size: 2048m
> > taskmanager.memory.process.size: 8192m
> > taskmanager.memory.managed.size: 2048m
> > 希望给与帮助,
> >
> >
> >
> > [hidden email]
> >
>