tumbling的窗口更新随着job运行时间越长,delay越久,sliding不会

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

tumbling的窗口更新随着job运行时间越长,delay越久,sliding不会

marble.zhong@coinflex.com.INVALID
大家好。

我用的tumbling window,
ds.keyBy(CandleView::getMarketCode)
                .timeWindow(Time.minutes(5L))
               
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
                .aggregate(new OhlcAggregateFunction(), new
OhlcWindowFunction())
                .addSink(new PgSink(jdbcUrl, userName, password,
candle_table_5m))
                .name(candle_table_5m);

Sliding Window:

ds.keyBy(CandleView::getMarketCode)
                .timeWindow(Time.hours(24L), Time.seconds(2))
                .aggregate(new OhlcAggregateFunction(), new
TickerWindowFunction())
                .addSink(new PgSink(jdbcUrl, userName, password,
candle_table_24h))
                .name(candle_table_24h);

一个是基于5分钟的窗口,一个是基于24小时的sliding窗口,24小时的窗口都已经update到了最新时间,但5分钟的滞后了越来越长时间,job运行不到2小时,已经滞后快20分钟,即将近4个窗口。
基于的是同一个dataStream

有没有什么建议,或者哪个地方用错了? 谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:tumbling的窗口更新随着job运行时间越长,delay越久,sliding不会

hailongwang
Hi marble,
看到你是在 window 内一直使用 agg 累加的,所以可以使用 filesystem backend 加速,但是可能内存会相对耗的比较多。因为rocksdb backend的话,每一条数据都会有一次put 和 get 的 IO 操作,故会比较慢些。
至于你提到的为什么 24h size,2s slide 的窗口没有延迟,5 min,1s 的连续 trigger 缺延迟了。这两者的行为不一样,其实没有什么可比的。
对于第二种,trigger 是依靠 timer 注册触发的,这样的话每秒都需要进行触发(如果是 process time),这样可能会太密集了。




Best,
Hailong Wang
在 2020-10-28 16:21:24,"[hidden email]" <[hidden email]> 写道:

>大家好。
>
>我用的tumbling window,
>ds.keyBy(CandleView::getMarketCode)
>                .timeWindow(Time.minutes(5L))
>              
>.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
>                .aggregate(new OhlcAggregateFunction(), new
>OhlcWindowFunction())
>                .addSink(new PgSink(jdbcUrl, userName, password,
>candle_table_5m))
>                .name(candle_table_5m);
>
>Sliding Window:
>
>ds.keyBy(CandleView::getMarketCode)
>                .timeWindow(Time.hours(24L), Time.seconds(2))
>                .aggregate(new OhlcAggregateFunction(), new
>TickerWindowFunction())
>                .addSink(new PgSink(jdbcUrl, userName, password,
>candle_table_24h))
>                .name(candle_table_24h);
>
>一个是基于5分钟的窗口,一个是基于24小时的sliding窗口,24小时的窗口都已经update到了最新时间,但5分钟的滞后了越来越长时间,job运行不到2小时,已经滞后快20分钟,即将近4个窗口。
>基于的是同一个dataStream
>
>有没有什么建议,或者哪个地方用错了? 谢谢
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/