Hi,
我想使用Flink的Session Window 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢! |
这个暂时应该是没有办法做到这一点。或者你可以用两个query来实现这个?
比如一个query是统计first_value;第二个是真正的session window。 gang.gou <[hidden email]> 于2020年5月11日周一 下午3:07写道: > Hi, > > 我想使用Flink的Session Window > 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢! > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
我想通过trigger的方式来实现这个需求,想法是重写EventTimeTrigger ,在首条记录进入系统时,触发一下purge,通过ValueStatus 记录状态。但是现在遇到的问题是,会重复触发多少。而且窗口关闭时的触发,WindowFunction Function收到的记录也不对。请问是我使用方式不对吗?
自定义的trigger: public class SessionComputeTrigger extends Trigger<SessionConvertBean, TimeWindow> { private static final long serialVersionUID = 1L; static Logger logger = LoggerFactory.getLogger(SessionComputeTrigger.class); private final ReducingStateDescriptor<Long> countDesc = new ReducingStateDescriptor<>("count", new SessionComputeTrigger.Sum(), LongSerializer.INSTANCE); private final ValueStateDescriptor<Boolean> existDesc = new ValueStateDescriptor<>("exist", BooleanSerializer.INSTANCE); private SessionComputeTrigger() { } @Override public TriggerResult onElement(SessionConvertBean element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ValueState<Boolean> exist = ctx.getPartitionedState(existDesc); ReducingState<Long> count = ctx.getPartitionedState(countDesc); if (exist == null || exist.value() == null || !exist.value()) { count.add(1L); logger.info("mau fire =====>exits:{} ====>count:{}====>date_time:{}", exist.value(), count.get(), element.dateTime); exist.update(true); return TriggerResult.FIRE; } if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return "EventTimeTrigger()"; } public static SessionComputeTrigger create() { return new SessionComputeTrigger(); } private static class Sum implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { logger.info("value1={},value2={}", value1, value2); return value1 + value2; } } } 输出值: mau fire =====>exits:null ====>count:1====>date_time:2020-05-10T23:22:40.298+0800 (date_time是当前记录的eventtime) result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:10.298+0800,间隔:30000.0,总数:1 mau fire =====>exits:null ====>count:1====>date_time:2020-05-10T23:22:43.298+0800 result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:13.298+0800,间隔:33000.0,总数:2 mau fire =====>exits:null ====>count:1====>date_time:2020-05-10T23:22:46.298+0800 result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:16.298+0800,间隔:36000.0,总数:3 mau fire =====>exits:null ====>count:1====>date_time:2020-05-10T23:22:49.298+0800 在 2020/5/11 下午4:12,“Benchao Li”<user-zh-return-3480-gougang_1991=[hidden email] 代表 [hidden email]> 写入: 这个暂时应该是没有办法做到这一点。或者你可以用两个query来实现这个? 比如一个query是统计first_value;第二个是真正的session window。 gang.gou <[hidden email]> 于2020年5月11日周一 下午3:07写道: > Hi, > > 我想使用Flink的Session Window > 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢! > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
看起来你使用的应该不是blink planner,因为blink planner里的Trigger跟DataStream里面的Trigger是不一样的。
所以如果你用的是legacy planner,可能这个方法是可行的。(我对legacy planner不熟悉) 但是如果你以后转到blink planner,这个应该是搞不了的。 gang.gou <[hidden email]> 于2020年5月12日周二 上午11:18写道: > 我想通过trigger的方式来实现这个需求,想法是重写EventTimeTrigger > ,在首条记录进入系统时,触发一下purge,通过ValueStatus > 记录状态。但是现在遇到的问题是,会重复触发多少。而且窗口关闭时的触发,WindowFunction > Function收到的记录也不对。请问是我使用方式不对吗? > > 自定义的trigger: > public class SessionComputeTrigger extends Trigger<SessionConvertBean, > TimeWindow> { > private static final long serialVersionUID = 1L; > static Logger logger = > LoggerFactory.getLogger(SessionComputeTrigger.class); > private final ReducingStateDescriptor<Long> countDesc = new > ReducingStateDescriptor<>("count", new SessionComputeTrigger.Sum(), > LongSerializer.INSTANCE); > > private final ValueStateDescriptor<Boolean> existDesc = new > ValueStateDescriptor<>("exist", BooleanSerializer.INSTANCE); > > private SessionComputeTrigger() { > } > > @Override > public TriggerResult onElement(SessionConvertBean element, long > timestamp, TimeWindow window, TriggerContext ctx) throws Exception { > ValueState<Boolean> exist = ctx.getPartitionedState(existDesc); > ReducingState<Long> count = ctx.getPartitionedState(countDesc); > > if (exist == null || exist.value() == null || !exist.value()) { > count.add(1L); > logger.info("mau fire =====>exits:{} > ====>count:{}====>date_time:{}", exist.value(), count.get(), > element.dateTime); > exist.update(true); > return TriggerResult.FIRE; > } > > if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { > // if the watermark is already past the window fire immediately > return TriggerResult.FIRE; > } else { > ctx.registerEventTimeTimer(window.maxTimestamp()); > return TriggerResult.CONTINUE; > } > } > > @Override > public TriggerResult onEventTime(long time, TimeWindow window, > TriggerContext ctx) { > return time == window.maxTimestamp() ? TriggerResult.FIRE : > TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > return TriggerResult.CONTINUE; > } > > @Override > public void clear(TimeWindow window, TriggerContext ctx) throws > Exception { > ctx.deleteEventTimeTimer(window.maxTimestamp()); > } > > @Override > public boolean canMerge() { > return true; > } > > @Override > public void onMerge(TimeWindow window, > OnMergeContext ctx) { > long windowMaxTimestamp = window.maxTimestamp(); > if (windowMaxTimestamp > ctx.getCurrentWatermark()) { > ctx.registerEventTimeTimer(windowMaxTimestamp); > } > } > > @Override > public String toString() { > return "EventTimeTrigger()"; > } > > public static SessionComputeTrigger create() { > return new SessionComputeTrigger(); > } > > private static class Sum implements ReduceFunction<Long> { > private static final long serialVersionUID = 1L; > > @Override > public Long reduce(Long value1, Long value2) throws Exception { > logger.info("value1={},value2={}", value1, value2); > return value1 + value2; > } > > } > } > > 输出值: > > mau fire =====>exits:null > ====>count:1====>date_time:2020-05-10T23:22:40.298+0800 > (date_time是当前记录的eventtime) > > result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:10.298+0800,间隔:30000.0,总数:1 > mau fire =====>exits:null > ====>count:1====>date_time:2020-05-10T23:22:43.298+0800 > > result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:13.298+0800,间隔:33000.0,总数:2 > mau fire =====>exits:null > ====>count:1====>date_time:2020-05-10T23:22:46.298+0800 > > result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:16.298+0800,间隔:36000.0,总数:3 > mau fire =====>exits:null > ====>count:1====>date_time:2020-05-10T23:22:49.298+0800 > > > > 在 2020/5/11 下午4:12,“Benchao Li”<user-zh-return-3480-gougang_1991= > [hidden email] 代表 [hidden email]> 写入: > > 这个暂时应该是没有办法做到这一点。或者你可以用两个query来实现这个? > 比如一个query是统计first_value;第二个是真正的session window。 > > gang.gou <[hidden email]> 于2020年5月11日周一 下午3:07写道: > > > Hi, > > > > 我想使用Flink的Session Window > > > 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢! > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking > University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
好的,我再研究研究,谢谢
在 2020/5/12 下午12:01,“Benchao Li”<user-zh-return-3495-gougang_1991=[hidden email] 代表 [hidden email]> 写入: 看起来你使用的应该不是blink planner,因为blink planner里的Trigger跟DataStream里面的Trigger是不一样的。 所以如果你用的是legacy planner,可能这个方法是可行的。(我对legacy planner不熟悉) 但是如果你以后转到blink planner,这个应该是搞不了的。 gang.gou <[hidden email]> 于2020年5月12日周二 上午11:18写道: > 我想通过trigger的方式来实现这个需求,想法是重写EventTimeTrigger > ,在首条记录进入系统时,触发一下purge,通过ValueStatus > 记录状态。但是现在遇到的问题是,会重复触发多少。而且窗口关闭时的触发,WindowFunction > Function收到的记录也不对。请问是我使用方式不对吗? > > 自定义的trigger: > public class SessionComputeTrigger extends Trigger<SessionConvertBean, > TimeWindow> { > private static final long serialVersionUID = 1L; > static Logger logger = > LoggerFactory.getLogger(SessionComputeTrigger.class); > private final ReducingStateDescriptor<Long> countDesc = new > ReducingStateDescriptor<>("count", new SessionComputeTrigger.Sum(), > LongSerializer.INSTANCE); > > private final ValueStateDescriptor<Boolean> existDesc = new > ValueStateDescriptor<>("exist", BooleanSerializer.INSTANCE); > > private SessionComputeTrigger() { > } > > @Override > public TriggerResult onElement(SessionConvertBean element, long > timestamp, TimeWindow window, TriggerContext ctx) throws Exception { > ValueState<Boolean> exist = ctx.getPartitionedState(existDesc); > ReducingState<Long> count = ctx.getPartitionedState(countDesc); > > if (exist == null || exist.value() == null || !exist.value()) { > count.add(1L); > logger.info("mau fire =====>exits:{} > ====>count:{}====>date_time:{}", exist.value(), count.get(), > element.dateTime); > exist.update(true); > return TriggerResult.FIRE; > } > > if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { > // if the watermark is already past the window fire immediately > return TriggerResult.FIRE; > } else { > ctx.registerEventTimeTimer(window.maxTimestamp()); > return TriggerResult.CONTINUE; > } > } > > @Override > public TriggerResult onEventTime(long time, TimeWindow window, > TriggerContext ctx) { > return time == window.maxTimestamp() ? TriggerResult.FIRE : > TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > return TriggerResult.CONTINUE; > } > > @Override > public void clear(TimeWindow window, TriggerContext ctx) throws > Exception { > ctx.deleteEventTimeTimer(window.maxTimestamp()); > } > > @Override > public boolean canMerge() { > return true; > } > > @Override > public void onMerge(TimeWindow window, > OnMergeContext ctx) { > long windowMaxTimestamp = window.maxTimestamp(); > if (windowMaxTimestamp > ctx.getCurrentWatermark()) { > ctx.registerEventTimeTimer(windowMaxTimestamp); > } > } > > @Override > public String toString() { > return "EventTimeTrigger()"; > } > > public static SessionComputeTrigger create() { > return new SessionComputeTrigger(); > } > > private static class Sum implements ReduceFunction<Long> { > private static final long serialVersionUID = 1L; > > @Override > public Long reduce(Long value1, Long value2) throws Exception { > logger.info("value1={},value2={}", value1, value2); > return value1 + value2; > } > > } > } > > 输出值: > > mau fire =====>exits:null > ====>count:1====>date_time:2020-05-10T23:22:40.298+0800 > (date_time是当前记录的eventtime) > > result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:10.298+0800,间隔:30000.0,总数:1 > mau fire =====>exits:null > ====>count:1====>date_time:2020-05-10T23:22:43.298+0800 > > result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:13.298+0800,间隔:33000.0,总数:2 > mau fire =====>exits:null > ====>count:1====>date_time:2020-05-10T23:22:46.298+0800 > > result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:16.298+0800,间隔:36000.0,总数:3 > mau fire =====>exits:null > ====>count:1====>date_time:2020-05-10T23:22:49.298+0800 > > > > 在 2020/5/11 下午4:12,“Benchao Li”<user-zh-return-3480-gougang_1991= > [hidden email] 代表 [hidden email]> 写入: > > 这个暂时应该是没有办法做到这一点。或者你可以用两个query来实现这个? > 比如一个query是统计first_value;第二个是真正的session window。 > > gang.gou <[hidden email]> 于2020年5月11日周一 下午3:07写道: > > > Hi, > > > > 我想使用Flink的Session Window > > > 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢! > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking > University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Free forum by Nabble | Edit this page |