FLINK Checkpoint 问题咨询

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK Checkpoint 问题咨询

CHENJIE
先谢谢各位大佬!


1.环境
FLINK 版本 :1.7.2
运行模式:flink on yarn (yarn single job)


2.配置
状态保存方式:RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true)
窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
计算方式:.aggregate(new MyAggregate(), new MyProcess())


3.数据
数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右


4.需求
监测设备超过1小时没有数据,离线报警
设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)


5.现象
该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)


6.咨询问题
a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!


祝大家 头发越来越多,代码BUG越来越少!




--样例数据
IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState SizeBuffered During Alignment
50459/5916:51:3016:51:344s80.1 MB0 B
50359/5916:31:3016:31:333s80.1 MB0 B
50259/5916:11:3016:11:344s80.4 MB0 B
50159/5915:51:3015:52:2353s3.56 GB0 B
50059/5915:31:5115:31:565s76.8 MB0 B
49959/5915:14:1615:14:193s93.8 MB0 B
49859/5914:54:1614:54:203s93.9 MB0 B
49759/5914:34:1614:35:0447s3.54 GB0 B
49659/5914:14:1614:14:203s92.9 MB0 B
49559/5913:54:1613:54:193s92.8 MB0 B
Reply | Threaded
Open this post in threaded view
|

Re: FLINK Checkpoint 问题咨询

Congxian Qiu
你好

1. 窗口数据都会保存的,保存在 State 中,在你的例子中,保存在 RocksDB 中
2. 从给的样例看,应该是增量数据变多了,猜测是往 RocksDB 写数据比较频繁,导致 compaction 之前的 sst 文件无用,这个尝试看
RocksDB 的 log 来验证
3. 窗口的状态暂时无法清楚,你可以用 ProcessFunction [1] 来模拟 window,在 ProcessFunction
中可以按照自己的逻辑清理状态数据

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
Best,
Congxian


USERNAME <[hidden email]> 于2019年7月16日周二 下午5:36写道:

> 先谢谢各位大佬!
>
>
> 1.环境
> FLINK 版本 :1.7.2
> 运行模式:flink on yarn (yarn single job)
>
>
> 2.配置
> 状态保存方式:RocksDBStateBackend backend = new
> RocksDBStateBackend("hdfs:/user/flink", true)
> 窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
> 计算方式:.aggregate(new MyAggregate(), new MyProcess())
>
>
> 3.数据
> 数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右
>
>
> 4.需求
> 监测设备超过1小时没有数据,离线报警
> 设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)
>
>
> 5.现象
> 该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)
>
>
> 6.咨询问题
> a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
> b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
> c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
> 问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!
>
>
> 祝大家 头发越来越多,代码BUG越来越少!
>
>
>
>
> --样例数据
> IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End
> DurationState SizeBuffered During Alignment
> 50459/5916:51:3016:51:344s80.1 MB0 B
> 50359/5916:31:3016:31:333s80.1 MB0 B
> 50259/5916:11:3016:11:344s80.4 MB0 B
> 50159/5915:51:3015:52:2353s3.56 GB0 B
> 50059/5915:31:5115:31:565s76.8 MB0 B
> 49959/5915:14:1615:14:193s93.8 MB0 B
> 49859/5914:54:1614:54:203s93.9 MB0 B
> 49759/5914:34:1614:35:0447s3.54 GB0 B
> 49659/5914:14:1614:14:203s92.9 MB0 B
> 49559/5913:54:1613:54:193s92.8 MB0 B
Reply | Threaded
Open this post in threaded view
|

Re:FLINK Checkpoint 问题咨询

tangjunliang@huitongjy.com
In reply to this post by CHENJIE
确定两个问题:
1、使用的是rocksdb 增量state?
2、checkpoint的时间间隔设置的多少?
 
 
------------------ Original ------------------
From:  "USERNAME"<[hidden email]>;
Date:  Tue, Jul 16, 2019 05:36 PM
To:  "user-zh"<[hidden email]>;

Subject:  FLINK Checkpoint 问题咨询

 

先谢谢各位大佬!


1.环境
FLINK 版本 :1.7.2
运行模式:flink on yarn (yarn single job)


2.配置
状态保存方式:RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true)
窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
计算方式:.aggregate(new MyAggregate(), new MyProcess())


3.数据
数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右


4.需求
监测设备超过1小时没有数据,离线报警
设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)


5.现象
该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)


6.咨询问题
a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!


祝大家 头发越来越多,代码BUG越来越少!




--样例数据
IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState SizeBuffered During Alignment
50459/5916:51:3016:51:344s80.1 MB0 B
50359/5916:31:3016:31:333s80.1 MB0 B
50259/5916:11:3016:11:344s80.4 MB0 B
50159/5915:51:3015:52:2353s3.56 GB0 B
50059/5915:31:5115:31:565s76.8 MB0 B
49959/5915:14:1615:14:193s93.8 MB0 B
49859/5914:54:1614:54:203s93.9 MB0 B
49759/5914:34:1614:35:0447s3.54 GB0 B
49659/5914:14:1614:14:203s92.9 MB0 B
49559/5913:54:1613:54:193s92.8 MB0 B
Reply | Threaded
Open this post in threaded view
|

Re:Re:FLINK Checkpoint 问题咨询

CHENJIE
1.是采用增量方式 RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true)
2.时间间隔 20 分钟
谢谢!


ID      Status      Acknowledged      Trigger Time      Latest Acknowledgement      End to End Duration      State Size      Buffered During Alignment      
504            59/59      16:51:30      16:51:34      4s      80.1 MB      0 B      
503            59/59      16:31:30      16:31:33      3s      80.1 MB      0 B      
502            59/59      16:11:30      16:11:34      4s      80.4 MB      0 B      
501            59/59      15:51:30      15:52:23      53s      3.56 GB      0 B      
500            59/59      15:31:51      15:31:56      5s      76.8 MB      0 B      
499            59/59      15:14:16      15:14:19      3s      93.8 MB      0 B      
498            59/59      14:54:16      14:54:20      3s      93.9 MB      0 B      
497            59/59      14:34:16      14:35:04      47s      3.54 GB      0 B      
496            59/59      14:14:16      14:14:20      3s      92.9 MB      0 B      
495            59/59      13:54:16      13:54:19      3s      92.8 MB      0 B      

在 2019-07-16 20:46:02,"唐军亮" <[hidden email]> 写道:

>确定两个问题:
>1、使用的是rocksdb 增量state?
>2、checkpoint的时间间隔设置的多少?
>
>
>------------------ Original ------------------
>From:  "USERNAME"<[hidden email]>;
>Date:  Tue, Jul 16, 2019 05:36 PM
>To:  "user-zh"<[hidden email]>;
>
>Subject:  FLINK Checkpoint 问题咨询
>
>
>
>先谢谢各位大佬!
>
>
>1.环境
>FLINK 版本 :1.7.2
>运行模式:flink on yarn (yarn single job)
>
>
>2.配置
>状态保存方式:RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true)
>窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
>计算方式:.aggregate(new MyAggregate(), new MyProcess())
>
>
>3.数据
>数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右
>
>
>4.需求
>监测设备超过1小时没有数据,离线报警
>设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)
>
>
>5.现象
>该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)
>
>
>6.咨询问题
>a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
>b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
>c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
>问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!
>
>
>祝大家 头发越来越多,代码BUG越来越少!
>
>
>
>
>--样例数据
>IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState SizeBuffered During Alignment
>50459/5916:51:3016:51:344s80.1 MB0 B
>50359/5916:31:3016:31:333s80.1 MB0 B
>50259/5916:11:3016:11:344s80.4 MB0 B
>50159/5915:51:3015:52:2353s3.56 GB0 B
>50059/5915:31:5115:31:565s76.8 MB0 B
>49959/5915:14:1615:14:193s93.8 MB0 B
>49859/5914:54:1614:54:203s93.9 MB0 B
>49759/5914:34:1614:35:0447s3.54 GB0 B
>49659/5914:14:1614:14:203s92.9 MB0 B
>49559/5913:54:1613:54:193s92.8 MB0 B