大家好:
我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG 等等。 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误) SQL: select query_nor, sum(cast (1asbigint))as query_nor_counter from ods_search_track groupby query_nor, HOP( event_time,interval'30'SECOND,interval'30'MINUTE) sum: public class Sum extends AggregateFunction<Long, AtomicLong> { @Override public boolean isDeterministic() { return false; } @Override public AtomicLong createAccumulator() { return new AtomicLong(); } @Override public void open(FunctionContext context) throws Exception { } @Override public Long getValue(AtomicLong acc) { return acc.get(); } @Override public TypeInformation getResultType() { return Types.LONG; } public void merge(AtomicLong acc, Iterable<AtomicLong> it) { Iterator<AtomicLong> iter = it.iterator(); while (iter.hasNext()) { AtomicLong a = iter.next(); acc.addAndGet(a.get()); } } public void accumulate(AtomicLong datas, Long data) { datas.addAndGet(data); } } 使用 Flink buildin COUNT select query_nor, count(1) as query_nor_counter from ods_search_track groupby query_nor, HOP( event_time,interval'30'SECOND,interval'30'MINUTE) |
Administrator
|
Hi,
看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。 我想先问几个问题: 1. 是基于哪个版本,哪个 planner 进行的测试? 2. 流计算模式还是批计算模式? 3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗? Best, Jark On Tue, 28 Apr 2020 at 10:46, forideal <[hidden email]> wrote: > 大家好: > > > 我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG > 等等。 > 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误) > > > SQL: > > > select > query_nor, > sum(cast (1asbigint))as query_nor_counter > from ods_search_track > groupby > query_nor, > HOP( > event_time,interval'30'SECOND,interval'30'MINUTE) > sum: > public class Sum extends AggregateFunction<Long, AtomicLong> { > > @Override > public boolean isDeterministic() { > return false; > } > > @Override > public AtomicLong createAccumulator() { > return new AtomicLong(); > } > > @Override > public void open(FunctionContext context) throws Exception { > > } > > @Override > public Long getValue(AtomicLong acc) { > return acc.get(); > } > > @Override > public TypeInformation getResultType() { > return Types.LONG; > } > > public void merge(AtomicLong acc, Iterable<AtomicLong> it) { > Iterator<AtomicLong> iter = it.iterator(); > while (iter.hasNext()) { > AtomicLong a = iter.next(); > acc.addAndGet(a.get()); > } > } > > public void accumulate(AtomicLong datas, Long data) { > datas.addAndGet(data); > } > } > > > 使用 Flink buildin COUNT > > > select > query_nor, > count(1) as query_nor_counter > from ods_search_track > groupby > query_nor, > HOP( > event_time,interval'30'SECOND,interval'30'MINUTE) |
Hi Jark: Thanks for your replay! 1. 是基于哪个版本,哪个 planner 进行的测试? Flink 1.9.0 Blink Planner 2. 流计算模式还是批计算模式? 流计算模式 3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗? 注册的名字为 red_sum Best forideal 在 2020-04-28 11:13:50,"Jark Wu" <[hidden email]> 写道: >Hi, > >看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。 >我想先问几个问题: >1. 是基于哪个版本,哪个 planner 进行的测试? >2. 流计算模式还是批计算模式? >3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗? > >Best, >Jark > >On Tue, 28 Apr 2020 at 10:46, forideal <[hidden email]> wrote: > >> 大家好: >> >> >> 我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG >> 等等。 >> 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误) >> >> >> SQL: >> >> >> select >> query_nor, >> sum(cast (1asbigint))as query_nor_counter >> from ods_search_track >> groupby >> query_nor, >> HOP( >> event_time,interval'30'SECOND,interval'30'MINUTE) >> sum: >> public class Sum extends AggregateFunction<Long, AtomicLong> { >> >> @Override >> public boolean isDeterministic() { >> return false; >> } >> >> @Override >> public AtomicLong createAccumulator() { >> return new AtomicLong(); >> } >> >> @Override >> public void open(FunctionContext context) throws Exception { >> >> } >> >> @Override >> public Long getValue(AtomicLong acc) { >> return acc.get(); >> } >> >> @Override >> public TypeInformation getResultType() { >> return Types.LONG; >> } >> >> public void merge(AtomicLong acc, Iterable<AtomicLong> it) { >> Iterator<AtomicLong> iter = it.iterator(); >> while (iter.hasNext()) { >> AtomicLong a = iter.next(); >> acc.addAndGet(a.get()); >> } >> } >> >> public void accumulate(AtomicLong datas, Long data) { >> datas.addAndGet(data); >> } >> } >> >> >> 使用 Flink buildin COUNT >> >> >> select >> query_nor, >> count(1) as query_nor_counter >> from ods_search_track >> groupby >> query_nor, >> HOP( >> event_time,interval'30'SECOND,interval'30'MINUTE) |
Free forum by Nabble | Edit this page |