Flink Buildin UDF 性能较慢

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Buildin UDF 性能较慢

forideal
大家好:


  我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG 等等。
我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)


SQL:


select
      query_nor,
      sum(cast (1asbigint))as query_nor_counter
    from ods_search_track
    groupby
      query_nor,
      HOP(
        event_time,interval'30'SECOND,interval'30'MINUTE)
sum:
public class Sum extends AggregateFunction<Long, AtomicLong> {

@Override
public boolean isDeterministic() {
return false;
}

@Override
public AtomicLong createAccumulator() {
return new AtomicLong();
}

@Override
public void open(FunctionContext context) throws Exception {

    }

@Override
public Long getValue(AtomicLong acc) {
return acc.get();
}

@Override
public TypeInformation getResultType() {
return Types.LONG;
}

public void merge(AtomicLong acc, Iterable<AtomicLong> it) {
        Iterator<AtomicLong> iter = it.iterator();
        while (iter.hasNext()) {
            AtomicLong a = iter.next();
acc.addAndGet(a.get());
}
    }

public void accumulate(AtomicLong datas, Long data) {
        datas.addAndGet(data);
}
}


使用 Flink buildin COUNT


select
      query_nor,
      count(1) as query_nor_counter
    from ods_search_track
    groupby
      query_nor,
      HOP(
        event_time,interval'30'SECOND,interval'30'MINUTE)
Reply | Threaded
Open this post in threaded view
|

Re: Flink Buildin UDF 性能较慢

Jark
Administrator
Hi,

看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。
我想先问几个问题:
1. 是基于哪个版本,哪个 planner 进行的测试?
2. 流计算模式还是批计算模式?
3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?

Best,
Jark

On Tue, 28 Apr 2020 at 10:46, forideal <[hidden email]> wrote:

> 大家好:
>
>
>   我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG
> 等等。
> 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)
>
>
> SQL:
>
>
> select
>       query_nor,
>       sum(cast (1asbigint))as query_nor_counter
>     from ods_search_track
>     groupby
>       query_nor,
>       HOP(
>         event_time,interval'30'SECOND,interval'30'MINUTE)
> sum:
> public class Sum extends AggregateFunction<Long, AtomicLong> {
>
> @Override
> public boolean isDeterministic() {
> return false;
> }
>
> @Override
> public AtomicLong createAccumulator() {
> return new AtomicLong();
> }
>
> @Override
> public void open(FunctionContext context) throws Exception {
>
>     }
>
> @Override
> public Long getValue(AtomicLong acc) {
> return acc.get();
> }
>
> @Override
> public TypeInformation getResultType() {
> return Types.LONG;
> }
>
> public void merge(AtomicLong acc, Iterable<AtomicLong> it) {
>         Iterator<AtomicLong> iter = it.iterator();
>         while (iter.hasNext()) {
>             AtomicLong a = iter.next();
> acc.addAndGet(a.get());
> }
>     }
>
> public void accumulate(AtomicLong datas, Long data) {
>         datas.addAndGet(data);
> }
> }
>
>
> 使用 Flink buildin COUNT
>
>
> select
>       query_nor,
>       count(1) as query_nor_counter
>     from ods_search_track
>     groupby
>       query_nor,
>       HOP(
>         event_time,interval'30'SECOND,interval'30'MINUTE)
Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink Buildin UDF 性能较慢

forideal



Hi Jark:


    Thanks for your replay!


    1. 是基于哪个版本,哪个 planner 进行的测试?
Flink 1.9.0 Blink Planner
2. 流计算模式还是批计算模式?
流计算模式
3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
      注册的名字为 red_sum


Best forideal
在 2020-04-28 11:13:50,"Jark Wu" <[hidden email]> 写道:

>Hi,
>
>看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。
>我想先问几个问题:
>1. 是基于哪个版本,哪个 planner 进行的测试?
>2. 流计算模式还是批计算模式?
>3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
>
>Best,
>Jark
>
>On Tue, 28 Apr 2020 at 10:46, forideal <[hidden email]> wrote:
>
>> 大家好:
>>
>>
>>   我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG
>> 等等。
>> 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)
>>
>>
>> SQL:
>>
>>
>> select
>>       query_nor,
>>       sum(cast (1asbigint))as query_nor_counter
>>     from ods_search_track
>>     groupby
>>       query_nor,
>>       HOP(
>>         event_time,interval'30'SECOND,interval'30'MINUTE)
>> sum:
>> public class Sum extends AggregateFunction<Long, AtomicLong> {
>>
>> @Override
>> public boolean isDeterministic() {
>> return false;
>> }
>>
>> @Override
>> public AtomicLong createAccumulator() {
>> return new AtomicLong();
>> }
>>
>> @Override
>> public void open(FunctionContext context) throws Exception {
>>
>>     }
>>
>> @Override
>> public Long getValue(AtomicLong acc) {
>> return acc.get();
>> }
>>
>> @Override
>> public TypeInformation getResultType() {
>> return Types.LONG;
>> }
>>
>> public void merge(AtomicLong acc, Iterable<AtomicLong> it) {
>>         Iterator<AtomicLong> iter = it.iterator();
>>         while (iter.hasNext()) {
>>             AtomicLong a = iter.next();
>> acc.addAndGet(a.get());
>> }
>>     }
>>
>> public void accumulate(AtomicLong datas, Long data) {
>>         datas.addAndGet(data);
>> }
>> }
>>
>>
>> 使用 Flink buildin COUNT
>>
>>
>> select
>>       query_nor,
>>       count(1) as query_nor_counter
>>     from ods_search_track
>>     groupby
>>       query_nor,
>>       HOP(
>>         event_time,interval'30'SECOND,interval'30'MINUTE)