flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

Ever
有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。
数据每隔10秒会过来一批。
代码如下图:
```
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)


env.setParallelism(parallel)


env.addSource(source)
      .map(json => {
          new InvokingInfoWrapper(xxx)
        })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) {
        override def extractTimestamp(invoking: InvokingInfoWrapper): Long = {
          invoking.timestamp
        }
      })
      .keyBy(invokingInfo => {
        s"${invokingInfo.caller}_${invokingInfo.callee}"
      })
      .timeWindow(Time.seconds(60), Time.seconds(10))
      .reduce(innerReducer).map(invokingInfo => { // ##2map
      //some mapping code
      invokingInfo
      })
      .addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")

```




由于是在预发布环境上线, 流量不大,我观察到一个现场如下:
1. 第一条数据的时间戳为03:15:48
2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口)
3. 第三条数据的时间戳为03:16:06,   触发reduce操作(同样5次)
4. 第四条数据的时间戳为03:17:55,   这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。
5. 第五条数据的时间戳为03:18:01,  这时候触发了跟第四条数据的reduce操作。


感觉前三条数据给吞了。


为什么呢?
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

Hequn Cheng
Hi,

应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time

On Fri, Jul 12, 2019 at 4:05 PM Ever <[hidden email]> wrote:

> 有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。
> 数据每隔10秒会过来一批。
> 代码如下图:
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
>
> env.setParallelism(parallel)
>
>
> env.addSource(source)
>       .map(json => {
>           new InvokingInfoWrapper(xxx)
>         })
>       .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5))
> {
>         override def extractTimestamp(invoking: InvokingInfoWrapper): Long
> = {
>           invoking.timestamp
>         }
>       })
>       .keyBy(invokingInfo => {
>         s"${invokingInfo.caller}_${invokingInfo.callee}"
>       })
>       .timeWindow(Time.seconds(60), Time.seconds(10))
>       .reduce(innerReducer).map(invokingInfo => { // ##2map
>       //some mapping code
>       invokingInfo
>       })
>       .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")
>
> ```
>
>
>
>
> 由于是在预发布环境上线, 流量不大,我观察到一个现场如下:
> 1. 第一条数据的时间戳为03:15:48
> 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口)
> 3. 第三条数据的时间戳为03:16:06,   触发reduce操作(同样5次)
> 4. 第四条数据的时间戳为03:17:55,
>  这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。
> 5. 第五条数据的时间戳为03:18:01,  这时候触发了跟第四条数据的reduce操作。
>
>
> 感觉前三条数据给吞了。
>
>
> 为什么呢?
Reply | Threaded
Open this post in threaded view
|

回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

Ever
第四条数据来的时间戳是: 03:17:55,  水印时间这时候应该是03:17:50,  不管是大窗口的关闭时间(第一条数据(03:15:48)的大窗口关闭时间:03:16:50)还是小的滑动窗口关闭时间, 都已经过了, 都应该关闭了啊




------------------ 原始邮件 ------------------
发件人: "Hequn Cheng"<[hidden email]>;
发送时间: 2019年7月14日(星期天) 中午11:55
收件人: "user-zh"<[hidden email]>;

主题: Re: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题



Hi,

应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time

On Fri, Jul 12, 2019 at 4:05 PM Ever <[hidden email]> wrote:

> 有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。
> 数据每隔10秒会过来一批。
> 代码如下图:
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
>
> env.setParallelism(parallel)
>
>
> env.addSource(source)
>       .map(json => {
>           new InvokingInfoWrapper(xxx)
>         })
>       .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5))
> {
>         override def extractTimestamp(invoking: InvokingInfoWrapper): Long
> = {
>           invoking.timestamp
>         }
>       })
>       .keyBy(invokingInfo => {
>         s"${invokingInfo.caller}_${invokingInfo.callee}"
>       })
>       .timeWindow(Time.seconds(60), Time.seconds(10))
>       .reduce(innerReducer).map(invokingInfo => { // ##2map
>       //some mapping code
>       invokingInfo
>       })
>       .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")
>
> ```
>
>
>
>
> 由于是在预发布环境上线, 流量不大,我观察到一个现场如下:
> 1. 第一条数据的时间戳为03:15:48
> 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口)
> 3. 第三条数据的时间戳为03:16:06,   触发reduce操作(同样5次)
> 4. 第四条数据的时间戳为03:17:55,
>  这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。
> 5. 第五条数据的时间戳为03:18:01,  这时候触发了跟第四条数据的reduce操作。
>
>
> 感觉前三条数据给吞了。
>
>
> 为什么呢?
Reply | Threaded
Open this post in threaded view
|

答复: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

Yuan,Youjun
并不是没条消息会触发watermark,而是有一定时间间隔的,默认是200ms触发一次watermark。
当你的数据来的比较集中的时候,经常会发生最新的消息的时间戳已经过了window end,但是window还没fire的情况。


-----邮件原件-----
发件人: Ever <[hidden email]>
发送时间: Sunday, July 14, 2019 5:00 PM
收件人: user-zh <[hidden email]>
主题: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

第四条数据来的时间戳是: 03:17:55,  水印时间这时候应该是03:17:50,  不管是大窗口的关闭时间(第一条数据(03:15:48)的大窗口关闭时间:03:16:50)还是小的滑动窗口关闭时间, 都已经过了, 都应该关闭了啊




------------------ 原始邮件 ------------------
发件人: "Hequn Cheng"<[hidden email]>;
发送时间: 2019年7月14日(星期天) 中午11:55
收件人: "user-zh"<[hidden email]>;

主题: Re: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题



Hi,

应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time

On Fri, Jul 12, 2019 at 4:05 PM Ever <[hidden email]> wrote:

> 有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。
> 数据每隔10秒会过来一批。
> 代码如下图:
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
>
> env.setParallelism(parallel)
>
>
> env.addSource(source)
>       .map(json => {
>           new InvokingInfoWrapper(xxx)
>         })
>       .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seco
> nds(5))
> {
>         override def extractTimestamp(invoking: InvokingInfoWrapper):
> Long = {
>           invoking.timestamp
>         }
>       })
>       .keyBy(invokingInfo => {
>         s"${invokingInfo.caller}_${invokingInfo.callee}"
>       })
>       .timeWindow(Time.seconds(60), Time.seconds(10))
>       .reduce(innerReducer).map(invokingInfo => { // ##2map
>       //some mapping code
>       invokingInfo
>       })
>       .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-s
> ink")
>
> ```
>
>
>
>
> 由于是在预发布环境上线, 流量不大,我观察到一个现场如下:
> 1. 第一条数据的时间戳为03:15:48
> 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口)
> 3. 第三条数据的时间戳为03:16:06,   触发reduce操作(同样5次)
> 4. 第四条数据的时间戳为03:17:55,
>  这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。
> 5. 第五条数据的时间戳为03:18:01,  这时候触发了跟第四条数据的reduce操作。
>
>
> 感觉前三条数据给吞了。
>
>
> 为什么呢?