我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function signature prod(<NUMERIC>),请求大佬帮忙看看_(:з」∠)_
以下是代码: ----------------------------------------------------- ... stableEnv.createTemporarySystemFunction("prod", ProductAggregateFunction.class); Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as yldrate from queryData group by pf_id"); ... ----------------------------------------------------- @FunctionHint( input = @DataTypeHint("Double"), output = @DataTypeHint("Double") ) public class ProductAggregateFunction extends AggregateFunction<Double, Product> { @Override public Double getValue(Product acc) { return acc.prod; } @Override public Product createAccumulator() { return new Product(); } public void accumulate(Product acc, Double iValue) { acc.prod *= iValue; } public void retract(Product acc, Double iValue) { acc.prod /= iValue; } public void merge(Product acc, Iterable<Product> it) { for (Product p : it) { accumulate(acc, p.prod); } } public void resetAccumulator(Product acc) { acc.prod = 1D; } } |
捞一下自己,在线等大佬们的回复 _(:з」∠)_
在 2021-02-20 13:14:18,"xiaoyue" <[hidden email]> 写道: 我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function signature prod(<NUMERIC>),请求大佬帮忙看看_(:з」∠)_ 以下是代码: ----------------------------------------------------- ... stableEnv.createTemporarySystemFunction("prod", ProductAggregateFunction.class); Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as yldrate from queryData group by pf_id"); ... ----------------------------------------------------- @FunctionHint( input = @DataTypeHint("Double"), output = @DataTypeHint("Double") ) public class ProductAggregateFunction extends AggregateFunction<Double, Product> { @Override public Double getValue(Product acc) { return acc.prod; } @Override public Product createAccumulator() { return new Product(); } public void accumulate(Product acc, Double iValue) { acc.prod *= iValue; } public void retract(Product acc, Double iValue) { acc.prod /= iValue; } public void merge(Product acc, Iterable<Product> it) { for (Product p : it) { accumulate(acc, p.prod); } } public void resetAccumulator(Product acc) { acc.prod = 1D; } } |
应该是继承scalaFunction ?
在 2021-02-22 10:25:31,"xiaoyue" <[hidden email]> 写道: >捞一下自己,在线等大佬们的回复 _(:з」∠)_ > > > > > > > >在 2021-02-20 13:14:18,"xiaoyue" <[hidden email]> 写道: > >我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function signature prod(<NUMERIC>),请求大佬帮忙看看_(:з」∠)_ > >以下是代码: >----------------------------------------------------- >... > stableEnv.createTemporarySystemFunction("prod", ProductAggregateFunction.class); > Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as yldrate from queryData group by pf_id"); >... >----------------------------------------------------- >@FunctionHint( > input = @DataTypeHint("Double"), > output = @DataTypeHint("Double") >) >public class ProductAggregateFunction extends AggregateFunction<Double, Product> { > > > @Override > public Double getValue(Product acc) { > return acc.prod; > } > @Override > public Product createAccumulator() { > return new Product(); > } > public void accumulate(Product acc, Double iValue) { > acc.prod *= iValue; > } > public void retract(Product acc, Double iValue) { > acc.prod /= iValue; > } > public void merge(Product acc, Iterable<Product> it) { > for (Product p : it) { > accumulate(acc, p.prod); > } > } > public void resetAccumulator(Product acc) { > acc.prod = 1D; > } >} > > > > > > |
这里实现的是自定义聚合函数,而不是标量函数鸭_(:з」∠)_
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年2月22日(星期一) 中午11:14 收件人: "user-zh"<[hidden email]>; 主题: Re:Re:SqlValidatorException: No match found for function signature prod(<NUMERIC>) 应该是继承scalaFunction ? 在 2021-02-22 10:25:31,"xiaoyue" <[hidden email]> 写道: >捞一下自己,在线等大佬们的回复 _(:з」∠)_ > > > > > > > >在 2021-02-20 13:14:18,"xiaoyue" <[hidden email]> 写道: > >我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function signature prod(<NUMERIC>),请求大佬帮忙看看_(:з」∠)_ > >以下是代码: >----------------------------------------------------- >... > stableEnv.createTemporarySystemFunction("prod", ProductAggregateFunction.class); > Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as yldrate from queryData group by pf_id"); >... >----------------------------------------------------- >@FunctionHint( > input = @DataTypeHint("Double"), > output = @DataTypeHint("Double") >) >public class ProductAggregateFunction extends AggregateFunction<Double, Product> { > > > @Override > public Double getValue(Product acc) { > return acc.prod; > } > @Override > public Product createAccumulator() { > return new Product(); > } > public void accumulate(Product acc, Double iValue) { > acc.prod *= iValue; > } > public void retract(Product acc, Double iValue) { > acc.prod /= iValue; > } > public void merge(Product acc, Iterable<Product> it) { > for (Product p : it) { > accumulate(acc, p.prod); > } > } > public void resetAccumulator(Product acc) { > acc.prod = 1D; > } >} > > > > > > |
Free forum by Nabble | Edit this page |