This post was updated on .
flink版本 1.11.1
实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下: public class AggDistinctDetail extends AggregateFunction<String, AggDistinctDetail.Details> { private static final Logger logger = LoggerFactory.getLogger(AggDistinctDetail.class); public static class Details { public Set<String> set; } @Override public Details createAccumulator() { return new Details(); } @Override public String getValue(Details acc) { return JSON.toJSONString(acc.set); } public void accumulate(Details acc, String val) { if (acc.set == null) { acc.set = new HashSet<>(); } acc.set.add(val); } public void retract(Details acc, String val) { //now, agg detail don't need support retraction } public void merge(Details acc, Iterable<Details> it) { Iterator<Details> iter = it.iterator(); if (acc.set == null) { acc.set = new HashSet<>(); } while (iter.hasNext()) { Details a = iter.next(); acc.set.addAll(a.set); } } public void resetAccumulator(Details acc) { acc.set = null; } } 将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime) requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。 drop function if exists UDF_InfoDistinctMerge; create function UDF_InfoDistinctMerge AS 'com.xx.risk.flink.udf.AggDistinctDetail'; select realIp , UDF_InfoDistinctMerge(userId) over w1 as userSet from source_table window w1 as (partition by realIp order by requestDateTime asc RANGE BETWEEN INTERVAL '24' hour preceding AND CURRENT ROW) ; 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。 问题: 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
看起来你并没有实现`retract` 方法,正常来讲,over window在处理过期数据的时候,会将过期的数据进行一次retract计算。 所以你需要正确的实现一下retract方法。 chen310 <[hidden email]> 于2020年9月14日周一 上午10:01写道: > flink版本 1.11.1 > > 实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下: > > public class AggDistinctDetail extends AggregateFunction<String, > AggDistinctDetail.Details> { > private static final Logger logger = > LoggerFactory.getLogger(AggDistinctDetail.class); > > public static class Details { > public Set<String> set; > } > > @Override > public Details createAccumulator() { > return new Details(); > } > > @Override > public String getValue(Details acc) { > return JSON.toJSONString(acc.set); > } > > public void accumulate(Details acc, String val) { > if (acc.set == null) { > acc.set = new HashSet<>(); > } > acc.set.add(val); > } > > public void retract(Details acc, String val) { > //now, agg detail don't need support retraction > } > > public void merge(Details acc, Iterable<Details> it) { > Iterator<Details> iter = it.iterator(); > if (acc.set == null) { > acc.set = new HashSet<>(); > } > while (iter.hasNext()) { > Details a = iter.next(); > acc.set.addAll(a.set); > } > } > > public void resetAccumulator(Details acc) { > acc.set = null; > } > } > > 将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime) > requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。 > > drop function if exists UDF_InfoDistinctMerge; > create function UDF_InfoDistinctMerge AS > 'com.binance.risk.flink.udf.AggDistinctDetail'; > > select > realIp , > UDF_InfoDistinctMerge(userId) over w1 as userSet > from source_table > window w1 as (partition by realIp order by requestDateTime asc RANGE > BETWEEN > INTERVAL '24' hour preceding AND CURRENT ROW) ; > > 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。 > > 问题: > 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
In reply to this post by Benchao Li-2
有没有文章,介绍过期时间清理的?需不需要用户自己设置TTL。
例如:我有一个TOPN计算,怎么做过期数据清理?(还是会自动处理) SELECT cnt, word, time_hour FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY time_hour ORDER BY cnt desc) AS rownum FROM test_word_count) WHERE rownum <= 100; Benchao Li <[hidden email]> 于2020年9月14日周一 下午1:03写道: > Hi, > > 看起来你并没有实现`retract` 方法,正常来讲,over window在处理过期数据的时候,会将过期的数据进行一次retract计算。 > 所以你需要正确的实现一下retract方法。 > > chen310 <[hidden email]> 于2020年9月14日周一 上午10:01写道: > > > flink版本 1.11.1 > > > > 实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下: > > > > public class AggDistinctDetail extends AggregateFunction<String, > > AggDistinctDetail.Details> { > > private static final Logger logger = > > LoggerFactory.getLogger(AggDistinctDetail.class); > > > > public static class Details { > > public Set<String> set; > > } > > > > @Override > > public Details createAccumulator() { > > return new Details(); > > } > > > > @Override > > public String getValue(Details acc) { > > return JSON.toJSONString(acc.set); > > } > > > > public void accumulate(Details acc, String val) { > > if (acc.set == null) { > > acc.set = new HashSet<>(); > > } > > acc.set.add(val); > > } > > > > public void retract(Details acc, String val) { > > //now, agg detail don't need support retraction > > } > > > > public void merge(Details acc, Iterable<Details> it) { > > Iterator<Details> iter = it.iterator(); > > if (acc.set == null) { > > acc.set = new HashSet<>(); > > } > > while (iter.hasNext()) { > > Details a = iter.next(); > > acc.set.addAll(a.set); > > } > > } > > > > public void resetAccumulator(Details acc) { > > acc.set = null; > > } > > } > > > > 将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime) > > requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。 > > > > drop function if exists UDF_InfoDistinctMerge; > > create function UDF_InfoDistinctMerge AS > > 'com.binance.risk.flink.udf.AggDistinctDetail'; > > > > select > > realIp , > > UDF_InfoDistinctMerge(userId) over w1 as userSet > > from source_table > > window w1 as (partition by realIp order by requestDateTime asc RANGE > > BETWEEN > > INTERVAL '24' hour preceding AND CURRENT ROW) ; > > > > 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。 > > > > 问题: > > 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug? > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > -- > > Best, > Benchao Li > -- ************************************** tivanli ************************************** |
Free forum by Nabble | Edit this page |