flink KeyedBroadcastProcessFunction 不能触发onTimer

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink KeyedBroadcastProcessFunction 不能触发onTimer

athlon512@gmail.com
以下是我写了的一段代码,在 processElement 中每次进来一条日志就注册一次 onTimer事件 ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
如果onTimer触发则说明窗口内所有日志都被处理然后我要对窗口内的所有数据进行ctr计算但是onTimer不触发,我使用 KeyedProcessFunction 就可以触发,这是怎么回事呢?

public class TopCTRProcessFunction extends KeyedBroadcastProcessFunction<Long, ItemViewCount, Tuple2<String, TopCTRInfo>, String> {
    private ListState<ItemViewCount> itemState;
    private MapStateDescriptor<String, TopCTRInfo> ruleMapStateDescr =
            new MapStateDescriptor<>("ruleMapState", String.class, TopCTRInfo.class);

    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (itemState != null) {
            itemState.clear();
            itemState = null;
        }
    }

    @Override
    public void processElement(ItemViewCount value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
        //计算ctr
        value.setCtr(value.getClkCount() == 0 ? 0 : ((double) value.getClkCount() / value.getImpCount() * 100));
        itemState.add(value);
        // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
        ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
    }

    @Override
    public void processBroadcastElement(Tuple2<String, TopCTRInfo> value, Context ctx, Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleMapStateDescr).put(value.f0, value.f1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        ReadOnlyBroadcastState<String, TopCTRInfo> topCTRMap = ctx.getBroadcastState(ruleMapStateDescr);
        TopCTRInfo topCTRInfo = topCTRMap.get(Constant.REDIS_BROADCAST_TOPCTR_KEY);
        if (topCTRInfo == null) {
            topCTRInfo = new TopCTRInfo();
        }
        List<ItemViewCount> allItems = new ArrayList<>();
        List<ItemViewCount> jsonItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            if (item.getCtr() >= topCTRInfo.getCtrAlertThreshold()) {
                allItems.add(item);
            }
        }
        itemState.clear();
        allItems.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) ((o2.getCtr() - o1.getCtr()) * 100);
            }
        });
        int totalSize = topCTRInfo.getTopSize() > allItems.size() ? allItems.size() : topCTRInfo.getTopSize();
        for (int i = 0; i < totalSize; i++) {
            jsonItems.add(allItems.get(i));
        }
        out.collect(JSON.toJSONString(jsonItems));
    }
}
以下是可以触发onTimer的代码 使用 KeyedProcessFunction
.process(new KeyedProcessFunction<Long, ItemViewCount, String>() {
    private final int topSize = 10;
    private ListState<ItemViewCount> itemState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
        //计算ctr
        value.setCtr(value.getClkCount() == 0 ? 0 : ((double) value.getClkCount() / value.getImpCount() * 100));
        itemState.add(value);
        // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
        ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        List<ItemViewCount> allItems = new ArrayList<>();
        List<ItemViewCount> jsonItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            allItems.add(item);
        }
        itemState.clear();
        allItems.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) ((o2.getCtr() - o1.getCtr()) * 100);
            }
        });
        int totalSize = topSize > allItems.size() ? allItems.size() : topSize;
        for (int i = 0; i < totalSize; i++) {
            jsonItems.add(allItems.get(i));
        }
        out.collect(JSON.toJSONString(jsonItems));
    }

})


[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: flink KeyedBroadcastProcessFunction 不能触发onTimer

athlon512@gmail.com
而且我发现查找了下 KeyedProcessFunction 的onTimer引用对应如下: 

KeyedBroadcastProcessFunction 的onTimer引用对应如下:


KeyedProcessFunction 的onTimer方法签名是这样的:
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
KeyedBroadcastProcessFunction 的onTimer方法签名是这样的:

public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<OUT> out) throws Exception {
// the default implementation does nothing.
}


 
发件人: [hidden email]
发送时间: 2019-08-14 10:14
收件人: [hidden email]
主题: flink KeyedBroadcastProcessFunction 不能触发onTimer
以下是我写了的一段代码,在 processElement 中每次进来一条日志就注册一次 onTimer事件 ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
如果onTimer触发则说明窗口内所有日志都被处理然后我要对窗口内的所有数据进行ctr计算但是onTimer不触发,我使用 KeyedProcessFunction 就可以触发,这是怎么回事呢?

public class TopCTRProcessFunction extends KeyedBroadcastProcessFunction<Long, ItemViewCount, Tuple2<String, TopCTRInfo>, String> {
private ListState<ItemViewCount> itemState;
private MapStateDescriptor<String, TopCTRInfo> ruleMapStateDescr =
new MapStateDescriptor<>("ruleMapState", String.class, TopCTRInfo.class);

@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
"itemState-state",
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}

@Override
public void close() throws Exception {
super.close();
if (itemState != null) {
itemState.clear();
itemState = null;
}
}

@Override
public void processElement(ItemViewCount value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
//计算ctr
value.setCtr(value.getClkCount() == 0 ? 0 : ((double) value.getClkCount() / value.getImpCount() * 100));
itemState.add(value);
// 注册 windowEnd+1 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
}

@Override
public void processBroadcastElement(Tuple2<String, TopCTRInfo> value, Context ctx, Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleMapStateDescr).put(value.f0, value.f1);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, TopCTRInfo> topCTRMap = ctx.getBroadcastState(ruleMapStateDescr);
TopCTRInfo topCTRInfo = topCTRMap.get(Constant.REDIS_BROADCAST_TOPCTR_KEY);
if (topCTRInfo == null) {
topCTRInfo = new TopCTRInfo();
}
List<ItemViewCount> allItems = new ArrayList<>();
List<ItemViewCount> jsonItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
if (item.getCtr() >= topCTRInfo.getCtrAlertThreshold()) {
allItems.add(item);
}
}
itemState.clear();
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) ((o2.getCtr() - o1.getCtr()) * 100);
}
});
int totalSize = topCTRInfo.getTopSize() > allItems.size() ? allItems.size() : topCTRInfo.getTopSize();
for (int i = 0; i < totalSize; i++) {
jsonItems.add(allItems.get(i));
}
out.collect(JSON.toJSONString(jsonItems));
}
}
以下是可以触发onTimer的代码 使用 KeyedProcessFunction 
.process(new KeyedProcessFunction<Long, ItemViewCount, String>() {
private final int topSize = 10;
private ListState<ItemViewCount> itemState;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
"itemState-state",
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}

@Override
public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
//计算ctr
value.setCtr(value.getClkCount() == 0 ? 0 : ((double) value.getClkCount() / value.getImpCount() * 100));
itemState.add(value);
// 注册 windowEnd+1 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
List<ItemViewCount> allItems = new ArrayList<>();
List<ItemViewCount> jsonItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
itemState.clear();
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) ((o2.getCtr() - o1.getCtr()) * 100);
}
});
int totalSize = topSize > allItems.size() ? allItems.size() : topSize;
for (int i = 0; i < totalSize; i++) {
jsonItems.add(allItems.get(i));
}
out.collect(JSON.toJSONString(jsonItems));
}

})