*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的) org.apache.flink.table.api.TableException: Aggregate functions are not updated to the new type system yet. at org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211) at java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511) at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685) at com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48) at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36) *// 以下是代码* // main EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment tEnv = TableEnvironment.create(envSettings); // 注册source table, jdbc table source tEnv.executeSql("CREATE TABLE wx_event_log (....) with ('connect.type'='jdbc'),...."); // 注册sink table,csv table sink tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with ('connect.type'='filesystem','format.type'='csv',.....)"); // 注册agg function tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc()); Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log where msg_type='text' and create_time between '2020-03-20' and '2020-03-21'"); table2.groupBy($("from_user")) .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today")) .select($("from_user"),$("first_send_msg_today")) .executeInsert("wx_data_statistics"); // 自定义agg function类 public class FirstSendMsgFunc extends AggregateFunction<LocalDateTime,CountDTO> { public void accumulate(CountDTO acc, LocalDateTime createTime) { if (acc.getDateTime() == null) { acc.setDateTime(createTime); } else if (acc.getDateTime().isAfter(createTime)) { acc.setDateTime(createTime); } } @Override public LocalDateTime getValue(CountDTO acc) { return acc.getDateTime(); } @Override public CountDTO createAccumulator() { return new CountDTO(); } } // accumulate pojo 类 public class CountDTO implements Serializable { private Integer count; private LocalDateTime dateTime; public Integer getCount() { return count; } public void setCount(Integer count) { this.count = count; } public LocalDateTime getDateTime() { return dateTime; } public void setDateTime(LocalDateTime dateTime) { this.dateTime = dateTime; } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
AggregateFunction。 你说 StreamTableEnvironment 可以,我估计你用的是 StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。 Best, Jark On Wed, 18 Nov 2020 at 09:49, lingchanhu <[hidden email]> wrote: > *flink1.11* > 在TableEnvironment环境中注册并使用自定义的Aggregate > Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment > 注册和使用则是正常,这应该说明自定义的函数是ok的) > > org.apache.flink.table.api.TableException: Aggregate functions are not > updated to the new type system yet. > at > > org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152) > at > > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183) > at > > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112) > at > > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) > at > > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) > at > > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) > at > > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) > at > > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84) > at > > org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211) > at java.util.function.Function.lambda$andThen$1(Function.java:88) > at > > org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178) > at > > org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651) > at > > org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616) > at > > org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598) > at > > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) > at > > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) > at > > org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511) > at > > org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685) > at > > com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48) > at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36) > > *// 以下是代码* > // main > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inBatchMode() > .build(); > > TableEnvironment tEnv = TableEnvironment.create(envSettings); > > // 注册source table, jdbc table source > tEnv.executeSql("CREATE TABLE wx_event_log (....) with > ('connect.type'='jdbc'),...."); > > // 注册sink table,csv table sink > tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with > ('connect.type'='filesystem','format.type'='csv',.....)"); > > // 注册agg function > tEnv.createTemporarySystemFunction("firSendMsgFunc",new > FirstSendMsgFunc()); > > Table table2 = tEnv.sqlQuery("select from_user,create_time from > wx_event_log > where msg_type='text' and create_time between '2020-03-20' and > '2020-03-21'"); > > table2.groupBy($("from_user")) > > > .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today")) > .select($("from_user"),$("first_send_msg_today")) > .executeInsert("wx_data_statistics"); > > > // 自定义agg function类 > public class FirstSendMsgFunc extends > AggregateFunction<LocalDateTime,CountDTO> { > > public void accumulate(CountDTO acc, LocalDateTime createTime) { > if (acc.getDateTime() == null) { > acc.setDateTime(createTime); > } else if (acc.getDateTime().isAfter(createTime)) { > acc.setDateTime(createTime); > } > } > > @Override > public LocalDateTime getValue(CountDTO acc) { > return acc.getDateTime(); > } > > @Override > public CountDTO createAccumulator() { > return new CountDTO(); > } > } > > // accumulate pojo 类 > public class CountDTO implements Serializable { > > private Integer count; > > private LocalDateTime dateTime; > > public Integer getCount() { > return count; > } > > public void setCount(Integer count) { > this.count = count; > } > > public LocalDateTime getDateTime() { > return dateTime; > } > > public void setDateTime(LocalDateTime dateTime) { > this.dateTime = dateTime; > } > } > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Administrator
|
Btw, 1.12 版本 TableEnvironment#createTemporarySystemFunction 接口支持
AggregateFunction了。 On Wed, 18 Nov 2020 at 10:34, Jark Wu <[hidden email]> wrote: > 1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持 > AggregateFunction。 > 你说 StreamTableEnvironment 可以,我估计你用的是 > StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。 > > Best, > Jark > > > On Wed, 18 Nov 2020 at 09:49, lingchanhu <[hidden email]> wrote: > >> *flink1.11* >> 在TableEnvironment环境中注册并使用自定义的Aggregate >> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment >> 注册和使用则是正常,这应该说明自定义的函数是ok的) >> >> org.apache.flink.table.api.TableException: Aggregate functions are not >> updated to the new type system yet. >> at >> >> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152) >> at >> >> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183) >> at >> >> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112) >> at >> >> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) >> at >> >> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) >> at >> >> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) >> at >> >> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83) >> at >> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) >> at >> >> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) >> at >> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) >> at >> >> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) >> at >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >> at >> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at >> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >> at >> >> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84) >> at >> >> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211) >> at java.util.function.Function.lambda$andThen$1(Function.java:88) >> at >> >> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178) >> at >> >> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651) >> at >> >> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616) >> at >> >> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598) >> at >> >> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) >> at >> >> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) >> at >> >> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511) >> at >> >> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685) >> at >> >> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48) >> at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36) >> >> *// 以下是代码* >> // main >> EnvironmentSettings envSettings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inBatchMode() >> .build(); >> >> TableEnvironment tEnv = TableEnvironment.create(envSettings); >> >> // 注册source table, jdbc table source >> tEnv.executeSql("CREATE TABLE wx_event_log (....) with >> ('connect.type'='jdbc'),...."); >> >> // 注册sink table,csv table sink >> tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with >> ('connect.type'='filesystem','format.type'='csv',.....)"); >> >> // 注册agg function >> tEnv.createTemporarySystemFunction("firSendMsgFunc",new >> FirstSendMsgFunc()); >> >> Table table2 = tEnv.sqlQuery("select from_user,create_time from >> wx_event_log >> where msg_type='text' and create_time between '2020-03-20' and >> '2020-03-21'"); >> >> table2.groupBy($("from_user")) >> >> >> .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today")) >> .select($("from_user"),$("first_send_msg_today")) >> .executeInsert("wx_data_statistics"); >> >> >> // 自定义agg function类 >> public class FirstSendMsgFunc extends >> AggregateFunction<LocalDateTime,CountDTO> { >> >> public void accumulate(CountDTO acc, LocalDateTime createTime) { >> if (acc.getDateTime() == null) { >> acc.setDateTime(createTime); >> } else if (acc.getDateTime().isAfter(createTime)) { >> acc.setDateTime(createTime); >> } >> } >> >> @Override >> public LocalDateTime getValue(CountDTO acc) { >> return acc.getDateTime(); >> } >> >> @Override >> public CountDTO createAccumulator() { >> return new CountDTO(); >> } >> } >> >> // accumulate pojo 类 >> public class CountDTO implements Serializable { >> >> private Integer count; >> >> private LocalDateTime dateTime; >> >> public Integer getCount() { >> return count; >> } >> >> public void setCount(Integer count) { >> this.count = count; >> } >> >> public LocalDateTime getDateTime() { >> return dateTime; >> } >> >> public void setDateTime(LocalDateTime dateTime) { >> this.dateTime = dateTime; >> } >> } >> >> >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> > |
In reply to this post by Jark
非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
通过 StreamTableEnvironmentImpl 构造函数直接构造一个 isStreamingMode = false
的 StreamTableEnvironmentImpl。 然后就可以在这个上面调用 registerFunction 了。 On Wed, 18 Nov 2020 at 10:40, lingchanhu <[hidden email]> wrote: > 非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |