flink1.11 TableEnvironment 不支持注册 Aggregate Function?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.11 TableEnvironment 不支持注册 Aggregate Function?

lingchanhu
*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate
Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)

org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
        at
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
        at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
        at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
        at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
        at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
        at
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
        at java.util.function.Function.lambda$andThen$1(Function.java:88)
        at
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
        at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
        at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
        at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
        at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
        at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
        at
org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
        at
org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
        at
com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
        at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)

*// 以下是代码*
// main
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();

TableEnvironment tEnv = TableEnvironment.create(envSettings);

// 注册source table, jdbc table source
tEnv.executeSql("CREATE TABLE wx_event_log (....) with
('connect.type'='jdbc'),....");

// 注册sink table,csv table sink
tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with
('connect.type'='filesystem','format.type'='csv',.....)");

// 注册agg function
tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc());

Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log
where msg_type='text' and create_time between '2020-03-20' and
'2020-03-21'");

table2.groupBy($("from_user"))
       
.aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
        .select($("from_user"),$("first_send_msg_today"))
        .executeInsert("wx_data_statistics");


// 自定义agg function类
public class FirstSendMsgFunc extends
AggregateFunction<LocalDateTime,CountDTO> {

    public void accumulate(CountDTO acc, LocalDateTime createTime) {
        if (acc.getDateTime() == null) {
            acc.setDateTime(createTime);
        } else if (acc.getDateTime().isAfter(createTime)) {
            acc.setDateTime(createTime);
        }
    }

    @Override
    public LocalDateTime getValue(CountDTO acc) {
        return acc.getDateTime();
    }

    @Override
    public CountDTO createAccumulator() {
        return new CountDTO();
    }
}

// accumulate pojo 类
public class CountDTO implements Serializable {

    private Integer count;

    private LocalDateTime dateTime;

    public Integer getCount() {
        return count;
    }

    public void setCount(Integer count) {
        this.count = count;
    }

    public LocalDateTime getDateTime() {
        return dateTime;
    }

    public void setDateTime(LocalDateTime dateTime) {
        this.dateTime = dateTime;
    }
}





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

Jark
Administrator
1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
AggregateFunction。
你说 StreamTableEnvironment 可以,我估计你用的是
StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。

Best,
Jark


On Wed, 18 Nov 2020 at 09:49, lingchanhu <[hidden email]> wrote:

> *flink1.11*
> 在TableEnvironment环境中注册并使用自定义的Aggregate
> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>
> org.apache.flink.table.api.TableException: Aggregate functions are not
> updated to the new type system yet.
>         at
>
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
>         at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
>         at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
>         at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>         at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>         at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>         at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
>         at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
>         at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>         at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>         at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>         at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>         at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>         at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
>         at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
>         at java.util.function.Function.lambda$andThen$1(Function.java:88)
>         at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
>         at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
>         at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
>         at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
>         at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>         at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>         at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
>         at
>
> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
>         at
>
> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
>         at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>
> *// 以下是代码*
> // main
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inBatchMode()
>                 .build();
>
> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>
> // 注册source table, jdbc table source
> tEnv.executeSql("CREATE TABLE wx_event_log (....) with
> ('connect.type'='jdbc'),....");
>
> // 注册sink table,csv table sink
> tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with
> ('connect.type'='filesystem','format.type'='csv',.....)");
>
> // 注册agg function
> tEnv.createTemporarySystemFunction("firSendMsgFunc",new
> FirstSendMsgFunc());
>
> Table table2 = tEnv.sqlQuery("select from_user,create_time from
> wx_event_log
> where msg_type='text' and create_time between '2020-03-20' and
> '2020-03-21'");
>
> table2.groupBy($("from_user"))
>
>
> .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
>         .select($("from_user"),$("first_send_msg_today"))
>         .executeInsert("wx_data_statistics");
>
>
> // 自定义agg function类
> public class FirstSendMsgFunc extends
> AggregateFunction<LocalDateTime,CountDTO> {
>
>     public void accumulate(CountDTO acc, LocalDateTime createTime) {
>         if (acc.getDateTime() == null) {
>             acc.setDateTime(createTime);
>         } else if (acc.getDateTime().isAfter(createTime)) {
>             acc.setDateTime(createTime);
>         }
>     }
>
>     @Override
>     public LocalDateTime getValue(CountDTO acc) {
>         return acc.getDateTime();
>     }
>
>     @Override
>     public CountDTO createAccumulator() {
>         return new CountDTO();
>     }
> }
>
> // accumulate pojo 类
> public class CountDTO implements Serializable {
>
>     private Integer count;
>
>     private LocalDateTime dateTime;
>
>     public Integer getCount() {
>         return count;
>     }
>
>     public void setCount(Integer count) {
>         this.count = count;
>     }
>
>     public LocalDateTime getDateTime() {
>         return dateTime;
>     }
>
>     public void setDateTime(LocalDateTime dateTime) {
>         this.dateTime = dateTime;
>     }
> }
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

Jark
Administrator
Btw, 1.12 版本 TableEnvironment#createTemporarySystemFunction 接口支持
AggregateFunction了。

On Wed, 18 Nov 2020 at 10:34, Jark Wu <[hidden email]> wrote:

> 1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
> AggregateFunction。
> 你说 StreamTableEnvironment 可以,我估计你用的是
> StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。
>
> Best,
> Jark
>
>
> On Wed, 18 Nov 2020 at 09:49, lingchanhu <[hidden email]> wrote:
>
>> *flink1.11*
>> 在TableEnvironment环境中注册并使用自定义的Aggregate
>> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
>> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>>
>> org.apache.flink.table.api.TableException: Aggregate functions are not
>> updated to the new type system yet.
>>         at
>>
>> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
>>         at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
>>         at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
>>         at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>>         at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>         at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>         at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
>>         at
>> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
>>         at
>>
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>         at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>>         at
>>
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>>         at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>         at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>         at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>         at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
>>         at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
>>         at java.util.function.Function.lambda$andThen$1(Function.java:88)
>>         at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
>>         at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
>>         at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
>>         at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
>>         at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>         at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>         at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
>>         at
>>
>> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
>>         at
>>
>> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
>>         at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>>
>> *// 以下是代码*
>> // main
>> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>>                 .useBlinkPlanner()
>>                 .inBatchMode()
>>                 .build();
>>
>> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>>
>> // 注册source table, jdbc table source
>> tEnv.executeSql("CREATE TABLE wx_event_log (....) with
>> ('connect.type'='jdbc'),....");
>>
>> // 注册sink table,csv table sink
>> tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with
>> ('connect.type'='filesystem','format.type'='csv',.....)");
>>
>> // 注册agg function
>> tEnv.createTemporarySystemFunction("firSendMsgFunc",new
>> FirstSendMsgFunc());
>>
>> Table table2 = tEnv.sqlQuery("select from_user,create_time from
>> wx_event_log
>> where msg_type='text' and create_time between '2020-03-20' and
>> '2020-03-21'");
>>
>> table2.groupBy($("from_user"))
>>
>>
>> .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
>>         .select($("from_user"),$("first_send_msg_today"))
>>         .executeInsert("wx_data_statistics");
>>
>>
>> // 自定义agg function类
>> public class FirstSendMsgFunc extends
>> AggregateFunction<LocalDateTime,CountDTO> {
>>
>>     public void accumulate(CountDTO acc, LocalDateTime createTime) {
>>         if (acc.getDateTime() == null) {
>>             acc.setDateTime(createTime);
>>         } else if (acc.getDateTime().isAfter(createTime)) {
>>             acc.setDateTime(createTime);
>>         }
>>     }
>>
>>     @Override
>>     public LocalDateTime getValue(CountDTO acc) {
>>         return acc.getDateTime();
>>     }
>>
>>     @Override
>>     public CountDTO createAccumulator() {
>>         return new CountDTO();
>>     }
>> }
>>
>> // accumulate pojo 类
>> public class CountDTO implements Serializable {
>>
>>     private Integer count;
>>
>>     private LocalDateTime dateTime;
>>
>>     public Integer getCount() {
>>         return count;
>>     }
>>
>>     public void setCount(Integer count) {
>>         this.count = count;
>>     }
>>
>>     public LocalDateTime getDateTime() {
>>         return dateTime;
>>     }
>>
>>     public void setDateTime(LocalDateTime dateTime) {
>>         this.dateTime = dateTime;
>>     }
>> }
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

lingchanhu
In reply to this post by Jark
非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

Jark
Administrator
通过 StreamTableEnvironmentImpl 构造函数直接构造一个 isStreamingMode = false
的 StreamTableEnvironmentImpl。
然后就可以在这个上面调用 registerFunction 了。

On Wed, 18 Nov 2020 at 10:40, lingchanhu <[hidden email]> wrote:

> 非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

lingchanhu
感谢,已经解决了!

BR,
lingchanhu



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

statementset下source怎么完全复用

Jeff
In reply to this post by Jark
请问一下,flink 1.11statement set 怎么复用同一个source呢? 希望同一个job里不同sink使用完全相同的数据,不是默认的用hash分流,这个有地方设置么?