大家好。
我用的tumbling window,
ds.keyBy(CandleView::getMarketCode)
.timeWindow(Time.minutes(5L))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.aggregate(new OhlcAggregateFunction(), new
OhlcWindowFunction())
.addSink(new PgSink(jdbcUrl, userName, password,
candle_table_5m))
.name(candle_table_5m);
Sliding Window:
ds.keyBy(CandleView::getMarketCode)
.timeWindow(Time.hours(24L), Time.seconds(2))
.aggregate(new OhlcAggregateFunction(), new
TickerWindowFunction())
.addSink(new PgSink(jdbcUrl, userName, password,
candle_table_24h))
.name(candle_table_24h);
一个是基于5分钟的窗口,一个是基于24小时的sliding窗口,24小时的窗口都已经update到了最新时间,但5分钟的滞后了越来越长时间,job运行不到2小时,已经滞后快20分钟,即将近4个窗口。
基于的是同一个dataStream
有没有什么建议,或者哪个地方用错了? 谢谢
--
Sent from:
http://apache-flink.147419.n8.nabble.com/