有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。
数据每隔10秒会过来一批。 代码如下图: ``` val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(watermarkGenInterval) env.setParallelism(parallel) env.addSource(source) .map(json => { new InvokingInfoWrapper(xxx) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) { override def extractTimestamp(invoking: InvokingInfoWrapper): Long = { invoking.timestamp } }) .keyBy(invokingInfo => { s"${invokingInfo.caller}_${invokingInfo.callee}" }) .timeWindow(Time.seconds(60), Time.seconds(10)) .reduce(innerReducer).map(invokingInfo => { // ##2map //some mapping code invokingInfo }) .addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink") ``` 由于是在预发布环境上线, 流量不大,我观察到一个现场如下: 1. 第一条数据的时间戳为03:15:48 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口) 3. 第三条数据的时间戳为03:16:06, 触发reduce操作(同样5次) 4. 第四条数据的时间戳为03:17:55, 这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。 5. 第五条数据的时间戳为03:18:01, 这时候触发了跟第四条数据的reduce操作。 感觉前三条数据给吞了。 为什么呢? |
Hi,
应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。 Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams [2] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time On Fri, Jul 12, 2019 at 4:05 PM Ever <[hidden email]> wrote: > 有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。 > 数据每隔10秒会过来一批。 > 代码如下图: > ``` > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.getConfig.setAutoWatermarkInterval(watermarkGenInterval) > > > env.setParallelism(parallel) > > > env.addSource(source) > .map(json => { > new InvokingInfoWrapper(xxx) > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) > { > override def extractTimestamp(invoking: InvokingInfoWrapper): Long > = { > invoking.timestamp > } > }) > .keyBy(invokingInfo => { > s"${invokingInfo.caller}_${invokingInfo.callee}" > }) > .timeWindow(Time.seconds(60), Time.seconds(10)) > .reduce(innerReducer).map(invokingInfo => { // ##2map > //some mapping code > invokingInfo > }) > .addSink(new > WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink") > > ``` > > > > > 由于是在预发布环境上线, 流量不大,我观察到一个现场如下: > 1. 第一条数据的时间戳为03:15:48 > 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口) > 3. 第三条数据的时间戳为03:16:06, 触发reduce操作(同样5次) > 4. 第四条数据的时间戳为03:17:55, > 这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。 > 5. 第五条数据的时间戳为03:18:01, 这时候触发了跟第四条数据的reduce操作。 > > > 感觉前三条数据给吞了。 > > > 为什么呢? |
第四条数据来的时间戳是: 03:17:55, 水印时间这时候应该是03:17:50, 不管是大窗口的关闭时间(第一条数据(03:15:48)的大窗口关闭时间:03:16:50)还是小的滑动窗口关闭时间, 都已经过了, 都应该关闭了啊
------------------ 原始邮件 ------------------ 发件人: "Hequn Cheng"<[hidden email]>; 发送时间: 2019年7月14日(星期天) 中午11:55 收件人: "user-zh"<[hidden email]>; 主题: Re: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题 Hi, 应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。 Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams [2] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time On Fri, Jul 12, 2019 at 4:05 PM Ever <[hidden email]> wrote: > 有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。 > 数据每隔10秒会过来一批。 > 代码如下图: > ``` > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.getConfig.setAutoWatermarkInterval(watermarkGenInterval) > > > env.setParallelism(parallel) > > > env.addSource(source) > .map(json => { > new InvokingInfoWrapper(xxx) > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) > { > override def extractTimestamp(invoking: InvokingInfoWrapper): Long > = { > invoking.timestamp > } > }) > .keyBy(invokingInfo => { > s"${invokingInfo.caller}_${invokingInfo.callee}" > }) > .timeWindow(Time.seconds(60), Time.seconds(10)) > .reduce(innerReducer).map(invokingInfo => { // ##2map > //some mapping code > invokingInfo > }) > .addSink(new > WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink") > > ``` > > > > > 由于是在预发布环境上线, 流量不大,我观察到一个现场如下: > 1. 第一条数据的时间戳为03:15:48 > 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口) > 3. 第三条数据的时间戳为03:16:06, 触发reduce操作(同样5次) > 4. 第四条数据的时间戳为03:17:55, > 这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。 > 5. 第五条数据的时间戳为03:18:01, 这时候触发了跟第四条数据的reduce操作。 > > > 感觉前三条数据给吞了。 > > > 为什么呢? |
并不是没条消息会触发watermark,而是有一定时间间隔的,默认是200ms触发一次watermark。
当你的数据来的比较集中的时候,经常会发生最新的消息的时间戳已经过了window end,但是window还没fire的情况。 -----邮件原件----- 发件人: Ever <[hidden email]> 发送时间: Sunday, July 14, 2019 5:00 PM 收件人: user-zh <[hidden email]> 主题: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题 第四条数据来的时间戳是: 03:17:55, 水印时间这时候应该是03:17:50, 不管是大窗口的关闭时间(第一条数据(03:15:48)的大窗口关闭时间:03:16:50)还是小的滑动窗口关闭时间, 都已经过了, 都应该关闭了啊 ------------------ 原始邮件 ------------------ 发件人: "Hequn Cheng"<[hidden email]>; 发送时间: 2019年7月14日(星期天) 中午11:55 收件人: "user-zh"<[hidden email]>; 主题: Re: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题 Hi, 应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。 Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams [2] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time On Fri, Jul 12, 2019 at 4:05 PM Ever <[hidden email]> wrote: > 有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。 > 数据每隔10秒会过来一批。 > 代码如下图: > ``` > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.getConfig.setAutoWatermarkInterval(watermarkGenInterval) > > > env.setParallelism(parallel) > > > env.addSource(source) > .map(json => { > new InvokingInfoWrapper(xxx) > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seco > nds(5)) > { > override def extractTimestamp(invoking: InvokingInfoWrapper): > Long = { > invoking.timestamp > } > }) > .keyBy(invokingInfo => { > s"${invokingInfo.caller}_${invokingInfo.callee}" > }) > .timeWindow(Time.seconds(60), Time.seconds(10)) > .reduce(innerReducer).map(invokingInfo => { // ##2map > //some mapping code > invokingInfo > }) > .addSink(new > WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-s > ink") > > ``` > > > > > 由于是在预发布环境上线, 流量不大,我观察到一个现场如下: > 1. 第一条数据的时间戳为03:15:48 > 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口) > 3. 第三条数据的时间戳为03:16:06, 触发reduce操作(同样5次) > 4. 第四条数据的时间戳为03:17:55, > 这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。 > 5. 第五条数据的时间戳为03:18:01, 这时候触发了跟第四条数据的reduce操作。 > > > 感觉前三条数据给吞了。 > > > 为什么呢? |
Free forum by Nabble | Edit this page |