flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

bradyMk
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

范超
中间状态用完了最好是clear掉。不然一直会占用tm的内存的呢

-----邮件原件-----
发件人: bradyMk [mailto:[hidden email]]
发送时间: 2020年12月5日 星期六 17:29
收件人: [hidden email]
主题: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

大家好~

最近刚刚尝试使用flink 1.9.1 的RocksDB做增量checkpoints;

在程序种设置:
    val backend = new RocksDBStateBackend("hdfs://xx/", true)
   
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)
并用MapState保存中间状态;(中间状态大概10个G);

我启动程序时,给taskmanager设置了3G内存:“-ytm 3072m \”,但是我的程序每跑一段时间都会报出超出物理内存的错误:"is
running beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory used; 6.2 GB of 14.6 TB virtual memory used"

我对此有点不解,RocksDB不是会定期把状态写到hdfs么?为什么内存占用会越来越大,最终被yarn
kill掉呢?难道是我漏掉了什么参数配置?希望各位能指点迷津~谢谢大家




-----
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

bradyMk
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

Yun Tang
RocksDB只是将数据可以存储在磁盘上,Flink再周期性将磁盘上数据上传到HDFS,内存中还是有LSM的write buffer以及block cache,也还是需要使用内存的

建议升级Flink版本到1.10+,引入了managed memory功能,理论上对于内存控制是要好很多的。


祝好
唐云
________________________________
From: bradyMk <[hidden email]>
Sent: Monday, December 7, 2020 11:27
To: [hidden email] <[hidden email]>
Subject: Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

hi~谢谢解答;

但我的状态用的是RocksDB,实质上不应该是存的磁盘么?为什么会一直占用tm的内存呢?



-----
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

bradyMk
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

Yun Tang
配置一下 RocksDB 的native metrics,看下block cache以及 write buffer的实际使用内存。
另外,Flink中一个state会使用一个RocksDB的column family,而write buffer和block cache是一套column family 一套,所以你的operator 内的state数目多,slot内的keyed operator多,都会导致内存成倍增长


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#rocksdb-native-metrics

祝好
唐云
________________________________
From: bradyMk <[hidden email]>
Sent: Monday, December 7, 2020 17:05
To: [hidden email] <[hidden email]>
Subject: Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

Hi~

可是我这边write buffer以及block cache等参数设置的都不大,都远远小于我分给tm的内存,可为什么还会报超出内存的错误呢?



-----
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

bradyMk
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

bradyMk
In reply to this post by Yun Tang
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

熊云昆
是线程安全的,mapstate也是keyed state,同一个key的state肯定是同一个线程处理的


| |
熊云昆
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年12月07日 18:18,bradyMk 写道:
这面还想多请教一下:

我程序中每来一条数据都会去读MapState然后覆盖写入新的时间戳,刚刚发现某一条数据读出了两条一样的时间戳,我推断是第一个线程读出来后还没等覆盖掉,第二个线程又读了一遍,导致出现两条一样的时间戳;

所以想请问flink中MapState是线程安全的吗?



-----
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

bradyMk
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

Yun Tang
State本身不是线程安全的 [1],但是目前对于state的更新都是在task主线程内,而task主线程是线程安全的。除非通过一些特别的方式,例如异步的metrics线程用户逻辑下访问state导致的state写更新副作用,一般是不会出现写错的问题。

[1] https://issues.apache.org/jira/browse/FLINK-13072

祝好
唐云
________________________________
From: bradyMk <[hidden email]>
Sent: Tuesday, December 8, 2020 17:59
To: [hidden email] <[hidden email]>
Subject: Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

好的,谢谢大佬解答~



-----
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

bradyMk
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

Yun Tang
Hi

Operator state 本身也并不是线程安全的,只是往常的读写都是持有checkpoint锁的task主线程或者checkpoint异步线程,所以才能做到数据安全,SourceFunction文档里面也强调需要在获得checkpointLock的前提下更新state。

至于如何开启Flink中的RocksDB的native metrics,之前给你的文档链接里面有描述,相关的配置项设为true即可。

祝好
唐云
________________________________
From: bradyMk <[hidden email]>
Sent: Thursday, December 10, 2020 11:44
To: [hidden email] <[hidden email]>
Subject: Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

谢谢大佬解答~最近一直在看相关的知识,我还有两个问题在网上没有找到解答,想咨询一下:

1、如果我不用keyed State,而改用Operator State,Operator
State是所有线程操作一个state么?如果这样,那Operator State是线程安全的么?

2、您之前说的配置 RocksDB 的native
metrics,我在官网看到这些指标都是禁用的,那该如何开启呢?我在代码里貌似没有找到相关方法开启各类RocksDB 的native metrics;




-----
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/