|
自定义AggregateFunction 实现了UV的 HLL 近似计算,问题是 HyperLogLog 是第三方包,这个如何让flink 识别 ?
就不知道这个TypeInformation该如何写。
代码如下:
import io.airlift.slice.Slices;
import io.airlift.stats.cardinality.HyperLogLog;
import org.apache.flink.table.functions.AggregateFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
public class FlinkUDAFCardinalityEstimationFunction extends AggregateFunction<Long, HyperLogLog> {
private static final Logger LOG = LoggerFactory.getLogger(JsonArrayParseUDTF.class);
private static final int NUMBER_OF_BUCKETS = 4096;
@Override
public HyperLogLog createAccumulator() {
return HyperLogLog.newInstance(NUMBER_OF_BUCKETS);
}
@Override
public Long getValue(HyperLogLog acc) {
if(acc == null){
return 0L;
}
return acc.cardinality();
}
public void accumulate(HyperLogLog acc, String element) {
if(element == null){
return;
}
acc.add(Slices.utf8Slice(element));
}
public void retract(HyperLogLog acc, byte[] element) {
// do nothing
LOG.info("-- retract:" + new String(element));
}
public void merge(HyperLogLog acc, Iterable<HyperLogLog> it) {
Iterator<HyperLogLog> iter = it.iterator();
while (iter.hasNext()) {
HyperLogLog a = iter.next();
if(a != null) {
acc.mergeWith(a);
}
}
}
public void resetAccumulator(HyperLogLog acc) {
acc = HyperLogLog.newInstance(NUMBER_OF_BUCKETS);
}
}
|