先谢谢各位大佬!
1.环境 FLINK 版本 :1.7.2 运行模式:flink on yarn (yarn single job) 2.配置 状态保存方式:RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true) 窗口方式:EventTimeSessionWindows.withGap(Time.hours(1)) 计算方式:.aggregate(new MyAggregate(), new MyProcess()) 3.数据 数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右 4.需求 监测设备超过1小时没有数据,离线报警 设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间) 5.现象 该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据) 6.咨询问题 a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样? c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算? 问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢! 祝大家 头发越来越多,代码BUG越来越少! --样例数据 IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState SizeBuffered During Alignment 50459/5916:51:3016:51:344s80.1 MB0 B 50359/5916:31:3016:31:333s80.1 MB0 B 50259/5916:11:3016:11:344s80.4 MB0 B 50159/5915:51:3015:52:2353s3.56 GB0 B 50059/5915:31:5115:31:565s76.8 MB0 B 49959/5915:14:1615:14:193s93.8 MB0 B 49859/5914:54:1614:54:203s93.9 MB0 B 49759/5914:34:1614:35:0447s3.54 GB0 B 49659/5914:14:1614:14:203s92.9 MB0 B 49559/5913:54:1613:54:193s92.8 MB0 B |
你好
1. 窗口数据都会保存的,保存在 State 中,在你的例子中,保存在 RocksDB 中 2. 从给的样例看,应该是增量数据变多了,猜测是往 RocksDB 写数据比较频繁,导致 compaction 之前的 sst 文件无用,这个尝试看 RocksDB 的 log 来验证 3. 窗口的状态暂时无法清楚,你可以用 ProcessFunction [1] 来模拟 window,在 ProcessFunction 中可以按照自己的逻辑清理状态数据 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html Best, Congxian USERNAME <[hidden email]> 于2019年7月16日周二 下午5:36写道: > 先谢谢各位大佬! > > > 1.环境 > FLINK 版本 :1.7.2 > 运行模式:flink on yarn (yarn single job) > > > 2.配置 > 状态保存方式:RocksDBStateBackend backend = new > RocksDBStateBackend("hdfs:/user/flink", true) > 窗口方式:EventTimeSessionWindows.withGap(Time.hours(1)) > 计算方式:.aggregate(new MyAggregate(), new MyProcess()) > > > 3.数据 > 数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右 > > > 4.需求 > 监测设备超过1小时没有数据,离线报警 > 设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间) > > > 5.现象 > 该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据) > > > 6.咨询问题 > a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB > b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样? > c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算? > 问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢! > > > 祝大家 头发越来越多,代码BUG越来越少! > > > > > --样例数据 > IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End > DurationState SizeBuffered During Alignment > 50459/5916:51:3016:51:344s80.1 MB0 B > 50359/5916:31:3016:31:333s80.1 MB0 B > 50259/5916:11:3016:11:344s80.4 MB0 B > 50159/5915:51:3015:52:2353s3.56 GB0 B > 50059/5915:31:5115:31:565s76.8 MB0 B > 49959/5915:14:1615:14:193s93.8 MB0 B > 49859/5914:54:1614:54:203s93.9 MB0 B > 49759/5914:34:1614:35:0447s3.54 GB0 B > 49659/5914:14:1614:14:203s92.9 MB0 B > 49559/5913:54:1613:54:193s92.8 MB0 B |
In reply to this post by CHENJIE
确定两个问题:
1、使用的是rocksdb 增量state? 2、checkpoint的时间间隔设置的多少? ------------------ Original ------------------ From: "USERNAME"<[hidden email]>; Date: Tue, Jul 16, 2019 05:36 PM To: "user-zh"<[hidden email]>; Subject: FLINK Checkpoint 问题咨询 先谢谢各位大佬! 1.环境 FLINK 版本 :1.7.2 运行模式:flink on yarn (yarn single job) 2.配置 状态保存方式:RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true) 窗口方式:EventTimeSessionWindows.withGap(Time.hours(1)) 计算方式:.aggregate(new MyAggregate(), new MyProcess()) 3.数据 数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右 4.需求 监测设备超过1小时没有数据,离线报警 设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间) 5.现象 该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据) 6.咨询问题 a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样? c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算? 问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢! 祝大家 头发越来越多,代码BUG越来越少! --样例数据 IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState SizeBuffered During Alignment 50459/5916:51:3016:51:344s80.1 MB0 B 50359/5916:31:3016:31:333s80.1 MB0 B 50259/5916:11:3016:11:344s80.4 MB0 B 50159/5915:51:3015:52:2353s3.56 GB0 B 50059/5915:31:5115:31:565s76.8 MB0 B 49959/5915:14:1615:14:193s93.8 MB0 B 49859/5914:54:1614:54:203s93.9 MB0 B 49759/5914:34:1614:35:0447s3.54 GB0 B 49659/5914:14:1614:14:203s92.9 MB0 B 49559/5913:54:1613:54:193s92.8 MB0 B |
1.是采用增量方式 RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true)
2.时间间隔 20 分钟 谢谢! ID Status Acknowledged Trigger Time Latest Acknowledgement End to End Duration State Size Buffered During Alignment 504 59/59 16:51:30 16:51:34 4s 80.1 MB 0 B 503 59/59 16:31:30 16:31:33 3s 80.1 MB 0 B 502 59/59 16:11:30 16:11:34 4s 80.4 MB 0 B 501 59/59 15:51:30 15:52:23 53s 3.56 GB 0 B 500 59/59 15:31:51 15:31:56 5s 76.8 MB 0 B 499 59/59 15:14:16 15:14:19 3s 93.8 MB 0 B 498 59/59 14:54:16 14:54:20 3s 93.9 MB 0 B 497 59/59 14:34:16 14:35:04 47s 3.54 GB 0 B 496 59/59 14:14:16 14:14:20 3s 92.9 MB 0 B 495 59/59 13:54:16 13:54:19 3s 92.8 MB 0 B 在 2019-07-16 20:46:02,"唐军亮" <[hidden email]> 写道: >确定两个问题: >1、使用的是rocksdb 增量state? >2、checkpoint的时间间隔设置的多少? > > >------------------ Original ------------------ >From: "USERNAME"<[hidden email]>; >Date: Tue, Jul 16, 2019 05:36 PM >To: "user-zh"<[hidden email]>; > >Subject: FLINK Checkpoint 问题咨询 > > > >先谢谢各位大佬! > > >1.环境 >FLINK 版本 :1.7.2 >运行模式:flink on yarn (yarn single job) > > >2.配置 >状态保存方式:RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true) >窗口方式:EventTimeSessionWindows.withGap(Time.hours(1)) >计算方式:.aggregate(new MyAggregate(), new MyProcess()) > > >3.数据 >数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右 > > >4.需求 >监测设备超过1小时没有数据,离线报警 >设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间) > > >5.现象 >该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据) > > >6.咨询问题 >a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB >b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样? >c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算? >问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢! > > >祝大家 头发越来越多,代码BUG越来越少! > > > > >--样例数据 >IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState SizeBuffered During Alignment >50459/5916:51:3016:51:344s80.1 MB0 B >50359/5916:31:3016:31:333s80.1 MB0 B >50259/5916:11:3016:11:344s80.4 MB0 B >50159/5915:51:3015:52:2353s3.56 GB0 B >50059/5915:31:5115:31:565s76.8 MB0 B >49959/5915:14:1615:14:193s93.8 MB0 B >49859/5914:54:1614:54:203s93.9 MB0 B >49759/5914:34:1614:35:0447s3.54 GB0 B >49659/5914:14:1614:14:203s92.9 MB0 B >49559/5913:54:1613:54:193s92.8 MB0 B |
Free forum by Nabble | Edit this page |