在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE;
或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true; set table.optimizer.distinct-agg.split.bucket-num=1024; 还需要对应的将SQL改写为两段式吗? 例如: 原SQL: SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, 对所需DISTINCT字段buy_id模1024自动打散后,SQL: SELECT day, SUM(cnt) total FROM ( SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, MOD(buy_id, 1024)) GROUP BY day 还是flink会帮我自动改写SQL,我不用关心? 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子 <http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,guomuhua
开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。 Best, Robin guomuhua wrote > 在SQL中,如果开启了 local-global 参数:set > table.optimizer.agg-phase-strategy=TWO_PHASE; > 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true; > set > table.optimizer.distinct-agg.split.bucket-num=1024; > 还需要对应的将SQL改写为两段式吗? > 例如: > 原SQL: > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, > > 对所需DISTINCT字段buy_id模1024自动打散后,SQL: > SELECT day, SUM(cnt) total > FROM ( > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt > FROM T GROUP BY day, MOD(buy_id, 1024)) > GROUP BY day > > 还是flink会帮我自动改写SQL,我不用关心? > > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子 > <http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
agg支持这个参数了。可以期待下。 Best, Jark On Wed, 24 Mar 2021 at 19:29, Robin Zhang <[hidden email]> wrote: > Hi,guomuhua > 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。 > > Best, > Robin > > > guomuhua wrote > > 在SQL中,如果开启了 local-global 参数:set > > table.optimizer.agg-phase-strategy=TWO_PHASE; > > 或者开启了Partial-Final 参数:set > table.optimizer.distinct-agg.split.enabled=true; > > set > > table.optimizer.distinct-agg.split.bucket-num=1024; > > 还需要对应的将SQL改写为两段式吗? > > 例如: > > 原SQL: > > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, > > > > 对所需DISTINCT字段buy_id模1024自动打散后,SQL: > > SELECT day, SUM(cnt) total > > FROM ( > > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt > > FROM T GROUP BY day, MOD(buy_id, 1024)) > > GROUP BY day > > > > 还是flink会帮我自动改写SQL,我不用关心? > > > > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子 > > < > http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png> > > > > > > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Jark wrote
> 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window > agg支持这个参数了。可以期待下。 > > Best, > Jark > > On Wed, 24 Mar 2021 at 19:29, Robin Zhang < > vincent2015qdlg@ > > > wrote: > >> Hi,guomuhua >> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。 >> >> Best, >> Robin >> >> >> guomuhua wrote >> > 在SQL中,如果开启了 local-global 参数:set >> > table.optimizer.agg-phase-strategy=TWO_PHASE; >> > 或者开启了Partial-Final 参数:set >> table.optimizer.distinct-agg.split.enabled=true; >> > set >> > table.optimizer.distinct-agg.split.bucket-num=1024; >> > 还需要对应的将SQL改写为两段式吗? >> > 例如: >> > 原SQL: >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, >> > >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL: >> > SELECT day, SUM(cnt) total >> > FROM ( >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt >> > FROM T GROUP BY day, MOD(buy_id, 1024)) >> > GROUP BY day >> > >> > 还是flink会帮我自动改写SQL,我不用关心? >> > >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子 >> > < >> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png> >> >> > >> > >> > >> > >> > >> > -- >> > Sent from: http://apache-flink.147419.n8.nabble.com/ >> >> >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
> 如果不是window agg,开启参数后flink会自动打散是吧
是的 > 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗? 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。 Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation On Fri, 26 Mar 2021 at 11:00, guomuhua <[hidden email]> wrote: > Jark wrote > > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window > > agg支持这个参数了。可以期待下。 > > > > Best, > > Jark > > > > On Wed, 24 Mar 2021 at 19:29, Robin Zhang < > > > vincent2015qdlg@ > > > > > > wrote: > > > >> Hi,guomuhua > >> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。 > >> > >> Best, > >> Robin > >> > >> > >> guomuhua wrote > >> > 在SQL中,如果开启了 local-global 参数:set > >> > table.optimizer.agg-phase-strategy=TWO_PHASE; > >> > 或者开启了Partial-Final 参数:set > >> table.optimizer.distinct-agg.split.enabled=true; > >> > set > >> > table.optimizer.distinct-agg.split.bucket-num=1024; > >> > 还需要对应的将SQL改写为两段式吗? > >> > 例如: > >> > 原SQL: > >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, > >> > > >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL: > >> > SELECT day, SUM(cnt) total > >> > FROM ( > >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt > >> > FROM T GROUP BY day, MOD(buy_id, 1024)) > >> > GROUP BY day > >> > > >> > 还是flink会帮我自动改写SQL,我不用关心? > >> > > >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子 > >> > < > >> > http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png> > ; > >> > >> > > >> > > >> > > >> > > >> > > >> > -- > >> > Sent from: http://apache-flink.147419.n8.nabble.com/ > >> > >> > >> > >> > >> > >> -- > >> Sent from: http://apache-flink.147419.n8.nabble.com/ > >> > > 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg, > 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Hi,Jark
我理解疑问中的sql是一个普通的agg操作,只不过分组的键是时间字段,不知道您说的 `我看你的作业里面是window agg` ,这个怎么理解 Best, Robin Jark wrote >> 如果不是window agg,开启参数后flink会自动打散是吧 > 是的 > >> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗? > 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。 > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation > > On Fri, 26 Mar 2021 at 11:00, guomuhua < > 663021157@ >> wrote: > >> Jark wrote >> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window >> > agg支持这个参数了。可以期待下。 >> > >> > Best, >> > Jark >> > >> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang < >> >> > vincent2015qdlg@ >> >> > > >> > wrote: >> > >> >> Hi,guomuhua >> >> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。 >> >> >> >> Best, >> >> Robin >> >> >> >> >> >> guomuhua wrote >> >> > 在SQL中,如果开启了 local-global 参数:set >> >> > table.optimizer.agg-phase-strategy=TWO_PHASE; >> >> > 或者开启了Partial-Final 参数:set >> >> table.optimizer.distinct-agg.split.enabled=true; >> >> > set >> >> > table.optimizer.distinct-agg.split.bucket-num=1024; >> >> > 还需要对应的将SQL改写为两段式吗? >> >> > 例如: >> >> > 原SQL: >> >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, >> >> > >> >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL: >> >> > SELECT day, SUM(cnt) total >> >> > FROM ( >> >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt >> >> > FROM T GROUP BY day, MOD(buy_id, 1024)) >> >> > GROUP BY day >> >> > >> >> > 还是flink会帮我自动改写SQL,我不用关心? >> >> > >> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子 >> >> > < >> >> >> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png> >> ; >> >> >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > -- >> >> > Sent from: http://apache-flink.147419.n8.nabble.com/ >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> >> >> >> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg, >> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢 >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by guomuhua
Hi,guomuhua
`The number of inputs accumulated by local aggregation every time is based on mini-batch interval. It means local-global aggregation depends on mini-batch optimization is enabled ` ,关于本地聚合,官网有这么一段话,也就是说,需要先开启批次聚合,然后才能使用本地聚合,加起来有三个参数. configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); configuration.setString("table.exec.mini-batch.size", "5000"); configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); 看你提问时只是开启了本地聚合一个参数,不知道是不是没写全。 Best, Robin guomuhua wrote > 在SQL中,如果开启了 local-global 参数:set > table.optimizer.agg-phase-strategy=TWO_PHASE; > 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true; > set > table.optimizer.distinct-agg.split.bucket-num=1024; > 还需要对应的将SQL改写为两段式吗? > 例如: > 原SQL: > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, > > 对所需DISTINCT字段buy_id模1024自动打散后,SQL: > SELECT day, SUM(cnt) total > FROM ( > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt > FROM T GROUP BY day, MOD(buy_id, 1024)) > GROUP BY day > > 还是flink会帮我自动改写SQL,我不用关心? > > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子 > <http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |