以下是我写了的一段代码,在 processElement 中每次进来一条日志就注册一次 onTimer事件 ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
如果onTimer触发则说明窗口内所有日志都被处理然后我要对窗口内的所有数据进行ctr计算但是onTimer不触发,我使用 KeyedProcessFunction 就可以触发,这是怎么回事呢? public class TopCTRProcessFunction extends KeyedBroadcastProcessFunction<Long, ItemViewCount, Tuple2<String, TopCTRInfo>, String> { private ListState<ItemViewCount> itemState; private MapStateDescriptor<String, TopCTRInfo> ruleMapStateDescr = new MapStateDescriptor<>("ruleMapState", String.class, TopCTRInfo.class); @Override public void open(Configuration parameters) throws Exception { ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>( "itemState-state", ItemViewCount.class); itemState = getRuntimeContext().getListState(itemsStateDesc); } @Override public void close() throws Exception { super.close(); if (itemState != null) { itemState.clear(); itemState = null; } } @Override public void processElement(ItemViewCount value, ReadOnlyContext ctx, Collector<String> out) throws Exception { //计算ctr value.setCtr(value.getClkCount() == 0 ? 0 : ((double) value.getClkCount() / value.getImpCount() * 100)); itemState.add(value); // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据 ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L); } @Override public void processBroadcastElement(Tuple2<String, TopCTRInfo> value, Context ctx, Collector<String> out) throws Exception { ctx.getBroadcastState(ruleMapStateDescr).put(value.f0, value.f1); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { ReadOnlyBroadcastState<String, TopCTRInfo> topCTRMap = ctx.getBroadcastState(ruleMapStateDescr); TopCTRInfo topCTRInfo = topCTRMap.get(Constant.REDIS_BROADCAST_TOPCTR_KEY); if (topCTRInfo == null) { topCTRInfo = new TopCTRInfo(); } List<ItemViewCount> allItems = new ArrayList<>(); List<ItemViewCount> jsonItems = new ArrayList<>(); for (ItemViewCount item : itemState.get()) { if (item.getCtr() >= topCTRInfo.getCtrAlertThreshold()) { allItems.add(item); } } itemState.clear(); allItems.sort(new Comparator<ItemViewCount>() { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { return (int) ((o2.getCtr() - o1.getCtr()) * 100); } }); int totalSize = topCTRInfo.getTopSize() > allItems.size() ? allItems.size() : topCTRInfo.getTopSize(); for (int i = 0; i < totalSize; i++) { jsonItems.add(allItems.get(i)); } out.collect(JSON.toJSONString(jsonItems)); } } 以下是可以触发onTimer的代码 使用 KeyedProcessFunction .process(new KeyedProcessFunction<Long, ItemViewCount, String>() { private final int topSize = 10; private ListState<ItemViewCount> itemState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>( "itemState-state", ItemViewCount.class); itemState = getRuntimeContext().getListState(itemsStateDesc); } @Override public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception { //计算ctr value.setCtr(value.getClkCount() == 0 ? 0 : ((double) value.getClkCount() / value.getImpCount() * 100)); itemState.add(value); // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据 ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { super.onTimer(timestamp, ctx, out); List<ItemViewCount> allItems = new ArrayList<>(); List<ItemViewCount> jsonItems = new ArrayList<>(); for (ItemViewCount item : itemState.get()) { allItems.add(item); } itemState.clear(); allItems.sort(new Comparator<ItemViewCount>() { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { return (int) ((o2.getCtr() - o1.getCtr()) * 100); } }); int totalSize = topSize > allItems.size() ? allItems.size() : topSize; for (int i = 0; i < totalSize; i++) { jsonItems.add(allItems.get(i)); } out.collect(JSON.toJSONString(jsonItems)); } }) [hidden email] |
而且我发现查找了下 KeyedProcessFunction 的onTimer引用对应如下:
KeyedBroadcastProcessFunction 的onTimer引用对应如下: KeyedProcessFunction 的onTimer方法签名是这样的: public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} KeyedBroadcastProcessFunction 的onTimer方法签名是这样的: public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<OUT> out) throws Exception {
|
Free forum by Nabble | Edit this page |