flink sql count distonct 优化

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql count distonct 优化

guomuhua
在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE;
或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
                                         set
table.optimizer.distinct-agg.split.bucket-num=1024;
还需要对应的将SQL改写为两段式吗?
例如:
    原SQL:
    SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,

    对所需DISTINCT字段buy_id模1024自动打散后,SQL:
    SELECT day, SUM(cnt) total
    FROM (
    SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
    FROM T GROUP BY day, MOD(buy_id, 1024))
    GROUP BY day

还是flink会帮我自动改写SQL,我不用关心?

另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
<http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png>





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink sql count distonct 优化

LiangbinZhang
Hi,guomuhua
      开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。

Best,
Robin


guomuhua wrote

> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
>                                          set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
>     原SQL:
>     SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>
>     对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>     SELECT day, SUM(cnt) total
>     FROM (
>     SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>     FROM T GROUP BY day, MOD(buy_id, 1024))
>     GROUP BY day
>
> 还是flink会帮我自动改写SQL,我不用关心?
>
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> &lt;http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png&gt; 
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink sql count distonct 优化

Jark
Administrator
我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
agg支持这个参数了。可以期待下。

Best,
Jark

On Wed, 24 Mar 2021 at 19:29, Robin Zhang <[hidden email]>
wrote:

> Hi,guomuhua
>       开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>
> Best,
> Robin
>
>
> guomuhua wrote
> > 在SQL中,如果开启了 local-global 参数:set
> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> > 或者开启了Partial-Final 参数:set
> table.optimizer.distinct-agg.split.enabled=true;
> >                                          set
> > table.optimizer.distinct-agg.split.bucket-num=1024;
> > 还需要对应的将SQL改写为两段式吗?
> > 例如:
> >     原SQL:
> >     SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >
> >     对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> >     SELECT day, SUM(cnt) total
> >     FROM (
> >     SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> >     FROM T GROUP BY day, MOD(buy_id, 1024))
> >     GROUP BY day
> >
> > 还是flink会帮我自动改写SQL,我不用关心?
> >
> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> > &lt;
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png&gt;
>
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql count distonct 优化

guomuhua
Jark wrote
> 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> agg支持这个参数了。可以期待下。
>
> Best,
> Jark
>
> On Wed, 24 Mar 2021 at 19:29, Robin Zhang &lt;

> vincent2015qdlg@

> &gt;
> wrote:
>
>> Hi,guomuhua
>>       开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>>
>> Best,
>> Robin
>>
>>
>> guomuhua wrote
>> > 在SQL中,如果开启了 local-global 参数:set
>> > table.optimizer.agg-phase-strategy=TWO_PHASE;
>> > 或者开启了Partial-Final 参数:set
>> table.optimizer.distinct-agg.split.enabled=true;
>> >                                          set
>> > table.optimizer.distinct-agg.split.bucket-num=1024;
>> > 还需要对应的将SQL改写为两段式吗?
>> > 例如:
>> >     原SQL:
>> >     SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>> >
>> >     对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>> >     SELECT day, SUM(cnt) total
>> >     FROM (
>> >     SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>> >     FROM T GROUP BY day, MOD(buy_id, 1024))
>> >     GROUP BY day
>> >
>> > 还是flink会帮我自动改写SQL,我不用关心?
>> >
>> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
>> > &lt;
>> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png&gt;
>>
>> >
>> >
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>

感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink sql count distonct 优化

Jark
Administrator
> 如果不是window agg,开启参数后flink会自动打散是吧
是的

> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation

On Fri, 26 Mar 2021 at 11:00, guomuhua <[hidden email]> wrote:

> Jark wrote
> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> > agg支持这个参数了。可以期待下。
> >
> > Best,
> > Jark
> >
> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang &lt;
>
> > vincent2015qdlg@
>
> > &gt;
> > wrote:
> >
> >> Hi,guomuhua
> >>       开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
> >>
> >> Best,
> >> Robin
> >>
> >>
> >> guomuhua wrote
> >> > 在SQL中,如果开启了 local-global 参数:set
> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> >> > 或者开启了Partial-Final 参数:set
> >> table.optimizer.distinct-agg.split.enabled=true;
> >> >                                          set
> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
> >> > 还需要对应的将SQL改写为两段式吗?
> >> > 例如:
> >> >     原SQL:
> >> >     SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >> >
> >> >     对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> >> >     SELECT day, SUM(cnt) total
> >> >     FROM (
> >> >     SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> >> >     FROM T GROUP BY day, MOD(buy_id, 1024))
> >> >     GROUP BY day
> >> >
> >> > 还是flink会帮我自动改写SQL,我不用关心?
> >> >
> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> >> > &lt;
> >>
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png&gt
> ;
> >>
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql count distonct 优化

LiangbinZhang
Hi,Jark
       我理解疑问中的sql是一个普通的agg操作,只不过分组的键是时间字段,不知道您说的 `我看你的作业里面是window agg`
,这个怎么理解

Best,
Robin



Jark wrote

>> 如果不是window agg,开启参数后flink会自动打散是吧
> 是的
>
>> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
> 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>
> On Fri, 26 Mar 2021 at 11:00, guomuhua <

> 663021157@

>> wrote:
>
>> Jark wrote
>> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
>> > agg支持这个参数了。可以期待下。
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang &lt;
>>
>> > vincent2015qdlg@
>>
>> > &gt;
>> > wrote:
>> >
>> >> Hi,guomuhua
>> >>       开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>> >>
>> >> Best,
>> >> Robin
>> >>
>> >>
>> >> guomuhua wrote
>> >> > 在SQL中,如果开启了 local-global 参数:set
>> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
>> >> > 或者开启了Partial-Final 参数:set
>> >> table.optimizer.distinct-agg.split.enabled=true;
>> >> >                                          set
>> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
>> >> > 还需要对应的将SQL改写为两段式吗?
>> >> > 例如:
>> >> >     原SQL:
>> >> >     SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>> >> >
>> >> >     对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>> >> >     SELECT day, SUM(cnt) total
>> >> >     FROM (
>> >> >     SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>> >> >     FROM T GROUP BY day, MOD(buy_id, 1024))
>> >> >     GROUP BY day
>> >> >
>> >> > 还是flink会帮我自动改写SQL,我不用关心?
>> >> >
>> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
>> >> > &lt;
>> >>
>> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png&gt
>> ;
>> >>
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
>> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink sql count distonct 优化

LiangbinZhang
In reply to this post by guomuhua
Hi,guomuhua
   `The number of inputs accumulated by local aggregation every time is
based on mini-batch interval. It means local-global aggregation depends on
mini-batch optimization is enabled `
,关于本地聚合,官网有这么一段话,也就是说,需要先开启批次聚合,然后才能使用本地聚合,加起来有三个参数.
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
     看你提问时只是开启了本地聚合一个参数,不知道是不是没写全。
Best,
Robin




guomuhua wrote

> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
>                                          set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
>     原SQL:
>     SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>
>     对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>     SELECT day, SUM(cnt) total
>     FROM (
>     SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>     FROM T GROUP BY day, MOD(buy_id, 1024))
>     GROUP BY day
>
> 还是flink会帮我自动改写SQL,我不用关心?
>
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> &lt;http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png&gt; 
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/