基于事件时间的滑动窗口,如何加速窗口的关闭

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

基于事件时间的滑动窗口,如何加速窗口的关闭

Ever
一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。

正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。

但是我们的数据源会每10秒集中过来一批数据。


现在的问题是, 数据过来后, 窗口没法及时关闭, 而是要等下一个10秒的数据到来后, 才会触发窗口关闭。


这种场景下, 如何加速窗口关闭?
例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据,
但是, 到9:02:3x的时候收到9:02:2x的数据的时候, 才会关闭9:02:1x这个窗口期。


这时候计算结果会延迟10秒+。


如何加速这个窗口的关闭呢?
如果想在收到的最后一条数据的时间戳+watermark之后就关闭窗口, 如何设置?
Reply | Threaded
Open this post in threaded view
|

Re: 基于事件时间的滑动窗口,如何加速窗口的关闭

Biao Liu
你好,看完你的描述,有一些疑问

> “一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
> 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
> 但是我们的数据源会每10秒集中过来一批数据。”

1. 你的 sliding window 不是10秒吗?不就应该10秒输出一次吗?

> 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据...

2. 你用了 watermark,但是又和 wall clock 比较,到底是基于 processing time 还是 event time?

3. 关于窗口关闭这个描述,没看懂,不如贴代码来的直接

4. 关于 window 的文档,可以参考
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#sliding-windows


Ever <[hidden email]> 于2019年7月10日周三 上午10:39写道:

> 一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
>
> 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
>
> 但是我们的数据源会每10秒集中过来一批数据。
>
>
> 现在的问题是, 数据过来后, 窗口没法及时关闭, 而是要等下一个10秒的数据到来后, 才会触发窗口关闭。
>
>
> 这种场景下, 如何加速窗口关闭?
> 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据,
> 但是, 到9:02:3x的时候收到9:02:2x的数据的时候, 才会关闭9:02:1x这个窗口期。
>
>
> 这时候计算结果会延迟10秒+。
>
>
> 如何加速这个窗口的关闭呢?
> 如果想在收到的最后一条数据的时间戳+watermark之后就关闭窗口, 如何设置?
Reply | Threaded
Open this post in threaded view
|

回复: 基于事件时间的滑动窗口,如何加速窗口的关闭

Ever
是10秒输出一次。 但每次的结果都会延迟10几秒。


问题的关键在于数据源是通过定时器批量发数据过来, 每10秒发一批, 发完再等10秒才发下一批。 flink收到一批数据后, 还得等window关闭才能处理这批数据并sink出去。 而window的关闭又要等下一批数据的到来。


下面是示范代码, source是kafka。
```
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)


env.setParallelism(parallel)


env.addSource(source)
      .map(json => {
          new InvokingInfoWrapper(xxx)
        })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) {
        override def extractTimestamp(invoking: InvokingInfoWrapper): Long = {
          invoking.timestamp
        }
      })
      .keyBy(invokingInfo => {
        s"${invokingInfo.caller}_${invokingInfo.callee}"
      })
      .timeWindow(Time.seconds(60), Time.seconds(10))
      .reduce(innerReducer).map(invokingInfo => {
      //some mapping code
      invokingInfo
      })
      .addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")

```


------------------ 原始邮件 ------------------
发件人: "Biao Liu"<[hidden email]>;
发送时间: 2019年7月10日(星期三) 中午11:34
收件人: "user-zh"<[hidden email]>;

主题: Re: 基于事件时间的滑动窗口,如何加速窗口的关闭



你好,看完你的描述,有一些疑问

> “一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
> 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
> 但是我们的数据源会每10秒集中过来一批数据。”

1. 你的 sliding window 不是10秒吗?不就应该10秒输出一次吗?

> 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据...

2. 你用了 watermark,但是又和 wall clock 比较,到底是基于 processing time 还是 event time?

3. 关于窗口关闭这个描述,没看懂,不如贴代码来的直接

4. 关于 window 的文档,可以参考
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#sliding-windows


Ever <[hidden email]> 于2019年7月10日周三 上午10:39写道:

> 一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
>
> 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
>
> 但是我们的数据源会每10秒集中过来一批数据。
>
>
> 现在的问题是, 数据过来后, 窗口没法及时关闭, 而是要等下一个10秒的数据到来后, 才会触发窗口关闭。
>
>
> 这种场景下, 如何加速窗口关闭?
> 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据,
> 但是, 到9:02:3x的时候收到9:02:2x的数据的时候, 才会关闭9:02:1x这个窗口期。
>
>
> 这时候计算结果会延迟10秒+。
>
>
> 如何加速这个窗口的关闭呢?
> 如果想在收到的最后一条数据的时间戳+watermark之后就关闭窗口, 如何设置?
Reply | Threaded
Open this post in threaded view
|

Re: 基于事件时间的滑动窗口,如何加速窗口的关闭

Biao Liu
大概明白了你的场景,果然贴代码好很多 :)

我理解你的问题是:
1. source 数据不连续,10秒一个 batch
2. 而你用的又是 event time,event time 依赖数据提供的时间
3. 没数据来的这个间隙,event time 无法更新,没有 watermark 发下去,也就导致 window 不能关闭

我能想到的几个方面
1. 你真的需要 event time 吗?如果用 processing time 就没这问题
2. 10秒一个 batch 的数据源,可以优化吗?如果是连续输入,也就没有这问题了
3. 语义上来说,你的下一批数据不到,Flink 也没办法发送 watermark,因为不知道会不会来更早的数据,这个是你使用的模型决定的。Flink
提供了一套机制来解决类似问题,不过不确定你的 source 是否能支持,可以参考下. [1]

1.
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#idling-sources


Ever <[hidden email]> 于2019年7月10日周三 上午11:45写道:

> 是10秒输出一次。 但每次的结果都会延迟10几秒。
>
> 问题的关键在于数据源是通过定时器批量发数据过来, 每10秒发一批, 发完再等10秒才发下一批。 flink收到一批数据后,
> 还得等window关闭才能处理这批数据并sink出去。 而window的关闭又要等下一批数据的到来。
>
> 下面是示范代码, source是kafka。
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
> env.setParallelism(parallel)
>
> env.addSource(source)
>       .map(json => {
>           new InvokingInfoWrapper(xxx)
>         })
>       .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5))
> {
>         override def extractTimestamp(invoking: InvokingInfoWrapper): Long
> = {
>           invoking.timestamp
>         }
>       })
>       .keyBy(invokingInfo => {
>         s"${invokingInfo.caller}_${invokingInfo.callee}"
>       })
>       .timeWindow(Time.seconds(60), Time.seconds(10))
>       .reduce(innerReducer).map(invokingInfo => {
>       //some mapping code
>       invokingInfo
>       })
>       .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")
> ```
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Biao Liu"<[hidden email]>;
> *发送时间:* 2019年7月10日(星期三) 中午11:34
> *收件人:* "user-zh"<[hidden email]>;
> *主题:* Re: 基于事件时间的滑动窗口,如何加速窗口的关闭
>
> 你好,看完你的描述,有一些疑问
>
> > “一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
> > 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
> > 但是我们的数据源会每10秒集中过来一批数据。”
>
> 1. 你的 sliding window 不是10秒吗?不就应该10秒输出一次吗?
>
> > 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据...
>
> 2. 你用了 watermark,但是又和 wall clock 比较,到底是基于 processing time 还是 event time?
>
> 3. 关于窗口关闭这个描述,没看懂,不如贴代码来的直接
>
> 4. 关于 window 的文档,可以参考
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#sliding-windows
>
>
> Ever <[hidden email]> 于2019年7月10日周三 上午10:39写道:
>
> > 一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
> >
> > 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
> >
> > 但是我们的数据源会每10秒集中过来一批数据。
> >
> >
> > 现在的问题是, 数据过来后, 窗口没法及时关闭, 而是要等下一个10秒的数据到来后, 才会触发窗口关闭。
> >
> >
> > 这种场景下, 如何加速窗口关闭?
> > 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据,
> > 但是, 到9:02:3x的时候收到9:02:2x的数据的时候, 才会关闭9:02:1x这个窗口期。
> >
> >
> > 这时候计算结果会延迟10秒+。
> >
> >
> > 如何加速这个窗口的关闭呢?
> > 如果想在收到的最后一条数据的时间戳+watermark之后就关闭窗口, 如何设置?
>
Reply | Threaded
Open this post in threaded view
|

回复: 基于事件时间的滑动窗口,如何加速窗口的关闭

Ever
感谢大佬。


因为我是在做服务监控, 数据采集端会简单聚合10秒内的数据再发上来, 如果每条数据都发上来的话, 那就太多了(生产环境超过万台服务器)。


你说的flink这个机制, 我之前也看过,会尝试一下。 但不知道这是否是业内最有效的方案, 所以就发了这个问题出来。


再次感谢。


------------------ 原始邮件 ------------------
发件人: "mmyy1110"<[hidden email]>;
发送时间: 2019年7月10日(星期三) 中午12:24
收件人: "Ever"<[hidden email]>;
抄送: "user-zh"<[hidden email]>;
主题: Re: 基于事件时间的滑动窗口,如何加速窗口的关闭



大概明白了你的场景,果然贴代码好很多 :)

我理解你的问题是:
1. source 数据不连续,10秒一个 batch
2. 而你用的又是 event time,event time 依赖数据提供的时间
3. 没数据来的这个间隙,event time 无法更新,没有 watermark 发下去,也就导致 window 不能关闭


我能想到的几个方面
1. 你真的需要 event time 吗?如果用 processing time 就没这问题
2. 10秒一个 batch 的数据源,可以优化吗?如果是连续输入,也就没有这问题了
3. 语义上来说,你的下一批数据不到,Flink 也没办法发送 watermark,因为不知道会不会来更早的数据,这个是你使用的模型决定的。Flink 提供了一套机制来解决类似问题,不过不确定你的 source 是否能支持,可以参考下. [1]


1. https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#idling-sources




Ever <[hidden email]> 于2019年7月10日周三 上午11:45写道:

是10秒输出一次。 但每次的结果都会延迟10几秒。


问题的关键在于数据源是通过定时器批量发数据过来, 每10秒发一批, 发完再等10秒才发下一批。 flink收到一批数据后, 还得等window关闭才能处理这批数据并sink出去。 而window的关闭又要等下一批数据的到来。


下面是示范代码, source是kafka。
```
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)


env.setParallelism(parallel)


env.addSource(source)
      .map(json => {
          new InvokingInfoWrapper(xxx)
        })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) {
        override def extractTimestamp(invoking: InvokingInfoWrapper): Long = {
          invoking.timestamp
        }
      })
      .keyBy(invokingInfo => {
        s"${invokingInfo.caller}_${invokingInfo.callee}"
      })
      .timeWindow(Time.seconds(60), Time.seconds(10))
      .reduce(innerReducer).map(invokingInfo => {
      //some mapping code
      invokingInfo
      })
      .addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")

```


------------------ 原始邮件 ------------------
发件人: "Biao Liu"<[hidden email]>;
发送时间: 2019年7月10日(星期三) 中午11:34
收件人: "user-zh"<[hidden email]>;

主题: Re: 基于事件时间的滑动窗口,如何加速窗口的关闭



你好,看完你的描述,有一些疑问

> “一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
> 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
> 但是我们的数据源会每10秒集中过来一批数据。”

1. 你的 sliding window 不是10秒吗?不就应该10秒输出一次吗?

> 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据...

2. 你用了 watermark,但是又和 wall clock 比较,到底是基于 processing time 还是 event time?

3. 关于窗口关闭这个描述,没看懂,不如贴代码来的直接

4. 关于 window 的文档,可以参考
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#sliding-windows


Ever <[hidden email]> 于2019年7月10日周三 上午10:39写道:

> 一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
>
> 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
>
> 但是我们的数据源会每10秒集中过来一批数据。
>
>
> 现在的问题是, 数据过来后, 窗口没法及时关闭, 而是要等下一个10秒的数据到来后, 才会触发窗口关闭。
>
>
> 这种场景下, 如何加速窗口关闭?
> 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据,
> 但是, 到9:02:3x的时候收到9:02:2x的数据的时候, 才会关闭9:02:1x这个窗口期。
>
>
> 这时候计算结果会延迟10秒+。
>
>
> 如何加速这个窗口的关闭呢?
> 如果想在收到的最后一条数据的时间戳+watermark之后就关闭窗口, 如何设置?