回复:flink窗口函数AggregateFunction中,merge的作用和应用场景

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

回复:flink窗口函数AggregateFunction中,merge的作用和应用场景

Cayden chen
官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口



---原始邮件---
发件人: "Zhefu PENG"<[hidden email]&gt;
发送时间: 2020年5月6日(周三) 晚上8:34
收件人: "user-zh"<[hidden email]&gt;;
主题: flink窗口函数AggregateFunction中,merge的作用和应用场景


Hi all,

在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。

之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。

代码(简单伪代码)如下:
简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
第二项是计算的方差结果,输出的结果是
class MergeAggregateExample extends AggregateFunction[Double, (Set,
Double), Double] {
&nbsp;&nbsp;&nbsp; override def createAccumulator(): (Set, Double) = {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //Set用来对数据进行去重, 后一项是计算方差后的结果
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; (Set(), 0.0)
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; override def add(item: Double, accumulator: (Set, Double)): (Set,
Double) = {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //判断新来的数据是否之前出现过,如果之前没出现过: 更新set, 并且增量更新方差;如果没出现过,直接跳过
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Tuple2(Set, Double)
&nbsp;&nbsp;&nbsp; }


&nbsp;&nbsp;&nbsp; override def getResult(accumulator: (Set, Double)): Double = {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 返回计算的方差结果
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulator._2
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; override def merge(a: (Set, Double), b: (Set, Double)): (Set, Double) =
{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
&nbsp; }


目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。

Looking forward to your reply and help.

Best,
Zhefu
Reply | Threaded
Open this post in threaded view
|

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

zhefu
非常感谢。那我是不是能理解为:我在这里用的是sliding time
window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?

On Wed, May 6, 2020 at 20:49 1193216154 <[hidden email]> wrote:

> 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
>
>
>
> ---原始邮件---
> 发件人: "Zhefu PENG"<[hidden email]&gt;
> 发送时间: 2020年5月6日(周三) 晚上8:34
> 收件人: "user-zh"<[hidden email]&gt;;
> 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
>
>
> Hi all,
>
>
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
>
>
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
>
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
>
> 代码(简单伪代码)如下:
> 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
> 第二项是计算的方差结果,输出的结果是
> class MergeAggregateExample extends AggregateFunction[Double, (Set,
> Double), Double] {
> &nbsp;&nbsp;&nbsp; override def createAccumulator(): (Set, Double) = {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //Set用来对数据进行去重, 后一项是计算方差后的结果
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; (Set(), 0.0)
> &nbsp;&nbsp;&nbsp; }
>
> &nbsp;&nbsp;&nbsp; override def add(item: Double, accumulator: (Set,
> Double)): (Set,
> Double) = {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //判断新来的数据是否之前出现过,如果之前没出现过: 更新set,
> 并且增量更新方差;如果没出现过,直接跳过
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Tuple2(Set, Double)
> &nbsp;&nbsp;&nbsp; }
>
>
> &nbsp;&nbsp;&nbsp; override def getResult(accumulator: (Set, Double)):
> Double = {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 返回计算的方差结果
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulator._2
> &nbsp;&nbsp;&nbsp; }
>
> &nbsp;&nbsp;&nbsp; override def merge(a: (Set, Double), b: (Set, Double)):
> (Set, Double) =
> {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
> &nbsp; }
>
>
> 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
>
> Looking forward to your reply and help.
>
> Best,
> Zhefu
Reply | Threaded
Open this post in threaded view
|

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

Benchao Li
Hi,

如果你用的sliding window,应该是也需要实现merge的。因为sliding window本身做了一个pane的优化。
会按照sliding size和window size取最大公约数作为pane,然后在trigger window计算的时候,把属于这个
window的pane都merge起来。

Zhefu PENG <[hidden email]> 于2020年5月6日周三 下午9:05写道:

> 非常感谢。那我是不是能理解为:我在这里用的是sliding time
> window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?
>
> On Wed, May 6, 2020 at 20:49 1193216154 <[hidden email]> wrote:
>
> > 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
> >
> >
> >
> > ---原始邮件---
> > 发件人: "Zhefu PENG"<[hidden email]&gt;
> > 发送时间: 2020年5月6日(周三) 晚上8:34
> > 收件人: "user-zh"<[hidden email]&gt;;
> > 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
> >
> >
> > Hi all,
> >
> >
> >
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
> >
> >
> >
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
> >
> >
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
> >
> > 代码(简单伪代码)如下:
> > 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
> > 第二项是计算的方差结果,输出的结果是
> > class MergeAggregateExample extends AggregateFunction[Double, (Set,
> > Double), Double] {
> > &nbsp;&nbsp;&nbsp; override def createAccumulator(): (Set, Double) = {
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //Set用来对数据进行去重, 后一项是计算方差后的结果
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; (Set(), 0.0)
> > &nbsp;&nbsp;&nbsp; }
> >
> > &nbsp;&nbsp;&nbsp; override def add(item: Double, accumulator: (Set,
> > Double)): (Set,
> > Double) = {
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //判断新来的数据是否之前出现过,如果之前没出现过: 更新set,
> > 并且增量更新方差;如果没出现过,直接跳过
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Tuple2(Set, Double)
> > &nbsp;&nbsp;&nbsp; }
> >
> >
> > &nbsp;&nbsp;&nbsp; override def getResult(accumulator: (Set, Double)):
> > Double = {
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 返回计算的方差结果
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulator._2
> > &nbsp;&nbsp;&nbsp; }
> >
> > &nbsp;&nbsp;&nbsp; override def merge(a: (Set, Double), b: (Set,
> Double)):
> > (Set, Double) =
> > {
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
> > &nbsp; }
> >
> >
> > 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
> >
> > Looking forward to your reply and help.
> >
> > Best,
> > Zhefu
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

Jingsong Li
Hi,

首先merge会不会导致结果不准确是完全看你的实现,这里你 的需求看起来并没有实现上的困难。

merge这个接口就是为了优化性能才提出的,就像Benchao说的那样,如果你实现了merge,引擎会给做一些pane的优化。

当然前提是merge是有效率提升的,具体根据你的实现和测试来判断。

Best,
Jingsong Lee

On Wed, May 6, 2020 at 9:22 PM Benchao Li <[hidden email]> wrote:

> Hi,
>
> 如果你用的sliding window,应该是也需要实现merge的。因为sliding window本身做了一个pane的优化。
> 会按照sliding size和window size取最大公约数作为pane,然后在trigger window计算的时候,把属于这个
> window的pane都merge起来。
>
> Zhefu PENG <[hidden email]> 于2020年5月6日周三 下午9:05写道:
>
> > 非常感谢。那我是不是能理解为:我在这里用的是sliding time
> > window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?
> >
> > On Wed, May 6, 2020 at 20:49 1193216154 <[hidden email]> wrote:
> >
> > > 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
> > >
> > >
> > >
> > > ---原始邮件---
> > > 发件人: "Zhefu PENG"<[hidden email]&gt;
> > > 发送时间: 2020年5月6日(周三) 晚上8:34
> > > 收件人: "user-zh"<[hidden email]&gt;;
> > > 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
> > >
> > >
> > > Hi all,
> > >
> > >
> > >
> >
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
> > >
> > >
> > >
> >
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
> > >
> > >
> >
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
> > >
> > > 代码(简单伪代码)如下:
> > > 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
> > > 第二项是计算的方差结果,输出的结果是
> > > class MergeAggregateExample extends AggregateFunction[Double, (Set,
> > > Double), Double] {
> > > &nbsp;&nbsp;&nbsp; override def createAccumulator(): (Set, Double) = {
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //Set用来对数据进行去重, 后一项是计算方差后的结果
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; (Set(), 0.0)
> > > &nbsp;&nbsp;&nbsp; }
> > >
> > > &nbsp;&nbsp;&nbsp; override def add(item: Double, accumulator: (Set,
> > > Double)): (Set,
> > > Double) = {
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //判断新来的数据是否之前出现过,如果之前没出现过: 更新set,
> > > 并且增量更新方差;如果没出现过,直接跳过
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Tuple2(Set, Double)
> > > &nbsp;&nbsp;&nbsp; }
> > >
> > >
> > > &nbsp;&nbsp;&nbsp; override def getResult(accumulator: (Set, Double)):
> > > Double = {
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 返回计算的方差结果
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulator._2
> > > &nbsp;&nbsp;&nbsp; }
> > >
> > > &nbsp;&nbsp;&nbsp; override def merge(a: (Set, Double), b: (Set,
> > Double)):
> > > (Set, Double) =
> > > {
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
> > > &nbsp; }
> > >
> > >
> > > 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
> > >
> > > Looking forward to your reply and help.
> > >
> > > Best,
> > > Zhefu
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

zhefu
Hi Benchao & Jingsong,

谢谢你们的回复。的确使用sliding time window也是需要实现merge的。
这里有个额外问题想问一下Jingsong,就是在我当前这个需求场景下,能否给一些提示,关于如果merge以后,怎么准确又高效地更新结果的方差值呢?我目前想到的是在add里面可以增量更新方差值,同时用set记录每个出现的数值,然后在merge的时候,将set进行合并,重新计算一遍。保证结果的准确。(没有想到以增量更新的方式准确更新merge后的结果)。可以些idea吗

Thanks,
Zhefu

On Wed, May 6, 2020 at 22:00 Jingsong Li <[hidden email]> wrote:

> Hi,
>
> 首先merge会不会导致结果不准确是完全看你的实现,这里你 的需求看起来并没有实现上的困难。
>
> merge这个接口就是为了优化性能才提出的,就像Benchao说的那样,如果你实现了merge,引擎会给做一些pane的优化。
>
> 当然前提是merge是有效率提升的,具体根据你的实现和测试来判断。
>
> Best,
> Jingsong Lee
>
> On Wed, May 6, 2020 at 9:22 PM Benchao Li <[hidden email]> wrote:
>
> > Hi,
> >
> > 如果你用的sliding window,应该是也需要实现merge的。因为sliding window本身做了一个pane的优化。
> > 会按照sliding size和window size取最大公约数作为pane,然后在trigger window计算的时候,把属于这个
> > window的pane都merge起来。
> >
> > Zhefu PENG <[hidden email]> 于2020年5月6日周三 下午9:05写道:
> >
> > > 非常感谢。那我是不是能理解为:我在这里用的是sliding time
> > > window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?
> > >
> > > On Wed, May 6, 2020 at 20:49 1193216154 <[hidden email]> wrote:
> > >
> > > > 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
> > > >
> > > >
> > > >
> > > > ---原始邮件---
> > > > 发件人: "Zhefu PENG"<[hidden email]&gt;
> > > > 发送时间: 2020年5月6日(周三) 晚上8:34
> > > > 收件人: "user-zh"<[hidden email]&gt;;
> > > > 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
> > > >
> > > >
> > > > Hi all,
> > > >
> > > >
> > > >
> > >
> >
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
> > > >
> > > >
> > > >
> > >
> >
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
> > > >
> > > >
> > >
> >
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
> > > >
> > > > 代码(简单伪代码)如下:
> > > > 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
> > > > 第二项是计算的方差结果,输出的结果是
> > > > class MergeAggregateExample extends AggregateFunction[Double, (Set,
> > > > Double), Double] {
> > > > &nbsp;&nbsp;&nbsp; override def createAccumulator(): (Set, Double) =
> {
> > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //Set用来对数据进行去重, 后一项是计算方差后的结果
> > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; (Set(), 0.0)
> > > > &nbsp;&nbsp;&nbsp; }
> > > >
> > > > &nbsp;&nbsp;&nbsp; override def add(item: Double, accumulator: (Set,
> > > > Double)): (Set,
> > > > Double) = {
> > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //判断新来的数据是否之前出现过,如果之前没出现过: 更新set,
> > > > 并且增量更新方差;如果没出现过,直接跳过
> > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Tuple2(Set, Double)
> > > > &nbsp;&nbsp;&nbsp; }
> > > >
> > > >
> > > > &nbsp;&nbsp;&nbsp; override def getResult(accumulator: (Set,
> Double)):
> > > > Double = {
> > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 返回计算的方差结果
> > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulator._2
> > > > &nbsp;&nbsp;&nbsp; }
> > > >
> > > > &nbsp;&nbsp;&nbsp; override def merge(a: (Set, Double), b: (Set,
> > > Double)):
> > > > (Set, Double) =
> > > > {
> > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
> > > > &nbsp; }
> > > >
> > > >
> > > > 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
> > > >
> > > > Looking forward to your reply and help.
> > > >
> > > > Best,
> > > > Zhefu
> > >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: [hidden email]; [hidden email]
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

Jingsong Li
Hi,

重新计算一遍当然是正确的。

一个方式是参考Hive[1], Agg buffer需要保存count,sum,variance.

另一个方式是考虑分离distinct和variance,你试过直接用flink内置函数吗?比如variance(distinct item)?
当内置函数为variance时,会做一些特殊的plan改写优化。

[1]
https://github.com/apache/hive/blob/0966a383d48348c36c270ddbcba2b4516c6f3a24/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFVariance.java

Best,
Jingsong Lee

On Thu, May 7, 2020 at 9:28 AM Zhefu PENG <[hidden email]> wrote:

> Hi Benchao & Jingsong,
>
> 谢谢你们的回复。的确使用sliding time window也是需要实现merge的。
>
> 这里有个额外问题想问一下Jingsong,就是在我当前这个需求场景下,能否给一些提示,关于如果merge以后,怎么准确又高效地更新结果的方差值呢?我目前想到的是在add里面可以增量更新方差值,同时用set记录每个出现的数值,然后在merge的时候,将set进行合并,重新计算一遍。保证结果的准确。(没有想到以增量更新的方式准确更新merge后的结果)。可以些idea吗
>
> Thanks,
> Zhefu
>
> On Wed, May 6, 2020 at 22:00 Jingsong Li <[hidden email]> wrote:
>
> > Hi,
> >
> > 首先merge会不会导致结果不准确是完全看你的实现,这里你 的需求看起来并没有实现上的困难。
> >
> > merge这个接口就是为了优化性能才提出的,就像Benchao说的那样,如果你实现了merge,引擎会给做一些pane的优化。
> >
> > 当然前提是merge是有效率提升的,具体根据你的实现和测试来判断。
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, May 6, 2020 at 9:22 PM Benchao Li <[hidden email]> wrote:
> >
> > > Hi,
> > >
> > > 如果你用的sliding window,应该是也需要实现merge的。因为sliding window本身做了一个pane的优化。
> > > 会按照sliding size和window size取最大公约数作为pane,然后在trigger window计算的时候,把属于这个
> > > window的pane都merge起来。
> > >
> > > Zhefu PENG <[hidden email]> 于2020年5月6日周三 下午9:05写道:
> > >
> > > > 非常感谢。那我是不是能理解为:我在这里用的是sliding time
> > > > window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?
> > > >
> > > > On Wed, May 6, 2020 at 20:49 1193216154 <[hidden email]> wrote:
> > > >
> > > > > 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
> > > > >
> > > > >
> > > > >
> > > > > ---原始邮件---
> > > > > 发件人: "Zhefu PENG"<[hidden email]&gt;
> > > > > 发送时间: 2020年5月6日(周三) 晚上8:34
> > > > > 收件人: "user-zh"<[hidden email]&gt;;
> > > > > 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
> > > > >
> > > > >
> > > > > Hi all,
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
> > > > >
> > > > >
> > > >
> > >
> >
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
> > > > >
> > > > > 代码(简单伪代码)如下:
> > > > > 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
> > > > > 第二项是计算的方差结果,输出的结果是
> > > > > class MergeAggregateExample extends AggregateFunction[Double, (Set,
> > > > > Double), Double] {
> > > > > &nbsp;&nbsp;&nbsp; override def createAccumulator(): (Set, Double)
> =
> > {
> > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //Set用来对数据进行去重, 后一项是计算方差后的结果
> > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; (Set(), 0.0)
> > > > > &nbsp;&nbsp;&nbsp; }
> > > > >
> > > > > &nbsp;&nbsp;&nbsp; override def add(item: Double, accumulator:
> (Set,
> > > > > Double)): (Set,
> > > > > Double) = {
> > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //判断新来的数据是否之前出现过,如果之前没出现过: 更新set,
> > > > > 并且增量更新方差;如果没出现过,直接跳过
> > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Tuple2(Set, Double)
> > > > > &nbsp;&nbsp;&nbsp; }
> > > > >
> > > > >
> > > > > &nbsp;&nbsp;&nbsp; override def getResult(accumulator: (Set,
> > Double)):
> > > > > Double = {
> > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 返回计算的方差结果
> > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulator._2
> > > > > &nbsp;&nbsp;&nbsp; }
> > > > >
> > > > > &nbsp;&nbsp;&nbsp; override def merge(a: (Set, Double), b: (Set,
> > > > Double)):
> > > > > (Set, Double) =
> > > > > {
> > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
> 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
> > > > > &nbsp; }
> > > > >
> > > > >
> > > > > 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
> > > > >
> > > > > Looking forward to your reply and help.
> > > > >
> > > > > Best,
> > > > > Zhefu
> > > >
> > >
> > >
> > > --
> > >
> > > Benchao Li
> > > School of Electronics Engineering and Computer Science, Peking
> University
> > > Tel:+86-15650713730
> > > Email: [hidden email]; [hidden email]
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

zhefu
Hi Jingsong,

如你所说,[1]就是一种增量更新方差的方式,这个也是我之前查到的一种方式,并打算在add内使用;但是我的问题是,在merge的时候,怎么让已经在之前算过的两批数据结果的准确性呢?比如,一个节点算过1,2,3的数据,另一个节点算过1,3,4的数据,ACC会保存总和,个数,方差值;但是在merge时候,借用ACC里留存的数据,使用[1]的增量更新方式,
就会造成结果的偏差啊(原因是因为数据需要去重,merge以后实现的其实是1,2,3,4的数据的方差计算,而不是1,2,3,1,3,4的计算)。这个怎么解决呢?

关于使用built-in的variance功能,我会去看一下,感谢你的帮助

Best,
Zhefu

Jingsong Li <[hidden email]> 于2020年5月7日周四 上午9:51写道:

> Hi,
>
> 重新计算一遍当然是正确的。
>
> 一个方式是参考Hive[1], Agg buffer需要保存count,sum,variance.
>
> 另一个方式是考虑分离distinct和variance,你试过直接用flink内置函数吗?比如variance(distinct item)?
> 当内置函数为variance时,会做一些特殊的plan改写优化。
>
> [1]
>
> https://github.com/apache/hive/blob/0966a383d48348c36c270ddbcba2b4516c6f3a24/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFVariance.java
>
> Best,
> Jingsong Lee
>
> On Thu, May 7, 2020 at 9:28 AM Zhefu PENG <[hidden email]> wrote:
>
> > Hi Benchao & Jingsong,
> >
> > 谢谢你们的回复。的确使用sliding time window也是需要实现merge的。
> >
> >
> 这里有个额外问题想问一下Jingsong,就是在我当前这个需求场景下,能否给一些提示,关于如果merge以后,怎么准确又高效地更新结果的方差值呢?我目前想到的是在add里面可以增量更新方差值,同时用set记录每个出现的数值,然后在merge的时候,将set进行合并,重新计算一遍。保证结果的准确。(没有想到以增量更新的方式准确更新merge后的结果)。可以些idea吗
> >
> > Thanks,
> > Zhefu
> >
> > On Wed, May 6, 2020 at 22:00 Jingsong Li <[hidden email]> wrote:
> >
> > > Hi,
> > >
> > > 首先merge会不会导致结果不准确是完全看你的实现,这里你 的需求看起来并没有实现上的困难。
> > >
> > > merge这个接口就是为了优化性能才提出的,就像Benchao说的那样,如果你实现了merge,引擎会给做一些pane的优化。
> > >
> > > 当然前提是merge是有效率提升的,具体根据你的实现和测试来判断。
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, May 6, 2020 at 9:22 PM Benchao Li <[hidden email]> wrote:
> > >
> > > > Hi,
> > > >
> > > > 如果你用的sliding window,应该是也需要实现merge的。因为sliding window本身做了一个pane的优化。
> > > > 会按照sliding size和window size取最大公约数作为pane,然后在trigger window计算的时候,把属于这个
> > > > window的pane都merge起来。
> > > >
> > > > Zhefu PENG <[hidden email]> 于2020年5月6日周三 下午9:05写道:
> > > >
> > > > > 非常感谢。那我是不是能理解为:我在这里用的是sliding time
> > > > > window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?
> > > > >
> > > > > On Wed, May 6, 2020 at 20:49 1193216154 <[hidden email]> wrote:
> > > > >
> > > > > > 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
> > > > > >
> > > > > >
> > > > > >
> > > > > > ---原始邮件---
> > > > > > 发件人: "Zhefu PENG"<[hidden email]&gt;
> > > > > > 发送时间: 2020年5月6日(周三) 晚上8:34
> > > > > > 收件人: "user-zh"<[hidden email]&gt;;
> > > > > > 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
> > > > > >
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
> > > > > >
> > > > > > 代码(简单伪代码)如下:
> > > > > > 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
> > > > > > 第二项是计算的方差结果,输出的结果是
> > > > > > class MergeAggregateExample extends AggregateFunction[Double,
> (Set,
> > > > > > Double), Double] {
> > > > > > &nbsp;&nbsp;&nbsp; override def createAccumulator(): (Set,
> Double)
> > =
> > > {
> > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //Set用来对数据进行去重, 后一项是计算方差后的结果
> > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; (Set(), 0.0)
> > > > > > &nbsp;&nbsp;&nbsp; }
> > > > > >
> > > > > > &nbsp;&nbsp;&nbsp; override def add(item: Double, accumulator:
> > (Set,
> > > > > > Double)): (Set,
> > > > > > Double) = {
> > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //判断新来的数据是否之前出现过,如果之前没出现过: 更新set,
> > > > > > 并且增量更新方差;如果没出现过,直接跳过
> > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Tuple2(Set, Double)
> > > > > > &nbsp;&nbsp;&nbsp; }
> > > > > >
> > > > > >
> > > > > > &nbsp;&nbsp;&nbsp; override def getResult(accumulator: (Set,
> > > Double)):
> > > > > > Double = {
> > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 返回计算的方差结果
> > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulator._2
> > > > > > &nbsp;&nbsp;&nbsp; }
> > > > > >
> > > > > > &nbsp;&nbsp;&nbsp; override def merge(a: (Set, Double), b: (Set,
> > > > > Double)):
> > > > > > (Set, Double) =
> > > > > > {
> > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
> > 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
> > > > > > &nbsp; }
> > > > > >
> > > > > >
> > > > > > 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
> > > > > >
> > > > > > Looking forward to your reply and help.
> > > > > >
> > > > > > Best,
> > > > > > Zhefu
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Benchao Li
> > > > School of Electronics Engineering and Computer Science, Peking
> > University
> > > > Tel:+86-15650713730
> > > > Email: [hidden email]; [hidden email]
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
>
> --
> Best, Jingsong Lee
>