比如Top2问题,自定义一个Top2函数,想了一下2种方法不可行,
想法一,用flinksql实现,官网没有给如何在flinksql引用Table Aggregate
Functions的示例,只给出了在tableapi中的引用,所以不会写了
// call function "inline" without registration in Table API
env
.from("MyTable")
.groupBy($("myField"))
.flatAggregate(call(Top2.class, $("value")))
.select($("myField"), $("f0"), $("f1"));
// call function "inline" without registration in Table API
// but use an alias for a better naming of Tuple2's fields
env
.from("MyTable")
.groupBy($("myField"))
.flatAggregate(call(Top2.class, $("value")).as("value", "rank"))
.select($("myField"), $("value"), $("rank"));
// register function
env.createTemporarySystemFunction("Top2", Top2.class);
// call registered function in Table API
env
.from("MyTable")
.groupBy($("myField"))
.flatAggregate(call("Top2", $("value")).as("value", "rank"))
.select($("myField"), $("value"), $("rank"));
想法二:用tableapi实现,尝试发现over window的后面只能接select,不能接flatAggregate,所以不会写了
transactions
.window(Over.partitionBy($("CUST_NO")).orderBy($("TRADE_TIME")).preceding(lit(10).minutes()).as("w"))
.select();
请问下,有什么解决方案吗?还是说暂时不支持over窗口的多条输出
--
Sent from:
http://apache-flink.147419.n8.nabble.com/