如何让Flink trigger只输出有变化的数据?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

如何让Flink trigger只输出有变化的数据?

Qi Kang
Hi,

我们有一个按自然天聚合统计各站点销量和GMV数据的Flink任务,代码框架如下:

```
sourceStream
  .map(message -> JSON.parseObject(message, OrderDetail.class))
  .keyby("siteId")
  .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
  .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
  .aggregate(new VolumeGmvAggregateFunc());
```

为了能够让dashboard实时刷新数据,每秒会触发一次计算。但是站点有将近1000个,每秒都输出全量结果不太现实,请问有什么简便的方法能够仅输出一秒内发生过变化的站点数据呢?Thx.


Reply | Threaded
Open this post in threaded view
|

回复:如何让Flink trigger只输出有变化的数据?

Jun Zhang-2
你可以指定一个Evictor,删除窗口中处理过的数据。

Best  Jun


------------------ 原始邮件 ------------------
发件人: Qi Kang <[hidden email]&gt;
发送时间: 2019年11月1日 16:37
收件人: user-zh <[hidden email]&gt;
主题: 回复:如何让Flink trigger只输出有变化的数据?



Hi,

我们有一个按自然天聚合统计各站点销量和GMV数据的Flink任务,代码框架如下:

```
sourceStream
&nbsp; .map(message -&gt; JSON.parseObject(message, OrderDetail.class))
&nbsp; .keyby("siteId")
&nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
&nbsp; .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
&nbsp; .aggregate(new VolumeGmvAggregateFunc());
```

为了能够让dashboard实时刷新数据,每秒会触发一次计算。但是站点有将近1000个,每秒都输出全量结果不太现实,请问有什么简便的方法能够仅输出一秒内发生过变化的站点数据呢?Thx.
Reply | Threaded
Open this post in threaded view
|

Re: 如何让Flink trigger只输出有变化的数据?

ZhangChangjun
我们遇到过类似的问题,最终sink前可将key的结果存入state中,通过对比state中的结果与最终结果是否一致,去决定是否需要collect结果。
觉得不是最好的解决办法,请教下如何通过Evictor剔除未更新的数据,不知是否方便详细说一下实现方式,谢谢!

> 2019年11月1日 下午8:41,Jun Zhang <[hidden email]> 写道:
>
> 你可以指定一个Evictor,删除窗口中处理过的数据。
>
> Best&nbsp;&nbsp;Jun
>
>
> ------------------ 原始邮件 ------------------
> 发件人: Qi Kang <[hidden email]&gt;
> 发送时间: 2019年11月1日 16:37
> 收件人: user-zh <[hidden email]&gt;
> 主题: 回复:如何让Flink trigger只输出有变化的数据?
>
>
>
> Hi,
>
> 我们有一个按自然天聚合统计各站点销量和GMV数据的Flink任务,代码框架如下:
>
> ```
> sourceStream
> &nbsp; .map(message -&gt; JSON.parseObject(message, OrderDetail.class))
> &nbsp; .keyby("siteId")
> &nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
> &nbsp; .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
> &nbsp; .aggregate(new VolumeGmvAggregateFunc());
> ```
>
> 为了能够让dashboard实时刷新数据,每秒会触发一次计算。但是站点有将近1000个,每秒都输出全量结果不太现实,请问有什么简便的方法能够仅输出一秒内发生过变化的站点数据呢?Thx.