Flink Session Window在窗口创建的时候输出一条数据

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Session Window在窗口创建的时候输出一条数据

苟刚
Hi,

    我想使用Flink的Session Window 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢!

Reply | Threaded
Open this post in threaded view
|

Re: Flink Session Window在窗口创建的时候输出一条数据

Benchao Li
这个暂时应该是没有办法做到这一点。或者你可以用两个query来实现这个?
比如一个query是统计first_value;第二个是真正的session window。

gang.gou <[hidden email]> 于2020年5月11日周一 下午3:07写道:

> Hi,
>
>     我想使用Flink的Session Window
> 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢!
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink Session Window在窗口创建的时候输出一条数据

苟刚
我想通过trigger的方式来实现这个需求,想法是重写EventTimeTrigger  ,在首条记录进入系统时,触发一下purge,通过ValueStatus 记录状态。但是现在遇到的问题是,会重复触发多少。而且窗口关闭时的触发,WindowFunction Function收到的记录也不对。请问是我使用方式不对吗?

自定义的trigger:
public class SessionComputeTrigger extends Trigger<SessionConvertBean, TimeWindow> {
    private static final long serialVersionUID = 1L;
    static Logger logger = LoggerFactory.getLogger(SessionComputeTrigger.class);
    private final ReducingStateDescriptor<Long> countDesc = new ReducingStateDescriptor<>("count", new SessionComputeTrigger.Sum(), LongSerializer.INSTANCE);

    private final ValueStateDescriptor<Boolean> existDesc = new ValueStateDescriptor<>("exist", BooleanSerializer.INSTANCE);

    private SessionComputeTrigger() {
    }

    @Override
    public TriggerResult onElement(SessionConvertBean element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ValueState<Boolean> exist = ctx.getPartitionedState(existDesc);
        ReducingState<Long> count = ctx.getPartitionedState(countDesc);

        if (exist == null || exist.value() == null || !exist.value()) {
            count.add(1L);
            logger.info("mau fire =====>exits:{} ====>count:{}====>date_time:{}", exist.value(), count.get(), element.dateTime);
            exist.update(true);
            return TriggerResult.FIRE;
        }

        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

    public static SessionComputeTrigger create() {
        return new SessionComputeTrigger();
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            logger.info("value1={},value2={}", value1, value2);
            return value1 + value2;
        }

    }
}

输出值:

mau fire =====>exits:null ====>count:1====>date_time:2020-05-10T23:22:40.298+0800  (date_time是当前记录的eventtime)
  result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:10.298+0800,间隔:30000.0,总数:1
mau fire =====>exits:null ====>count:1====>date_time:2020-05-10T23:22:43.298+0800
 result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:13.298+0800,间隔:33000.0,总数:2
mau fire =====>exits:null ====>count:1====>date_time:2020-05-10T23:22:46.298+0800
 result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:16.298+0800,间隔:36000.0,总数:3
mau fire =====>exits:null ====>count:1====>date_time:2020-05-10T23:22:49.298+0800



在 2020/5/11 下午4:12,“Benchao Li”<user-zh-return-3480-gougang_1991=[hidden email] 代表 [hidden email]> 写入:

    这个暂时应该是没有办法做到这一点。或者你可以用两个query来实现这个?
    比如一个query是统计first_value;第二个是真正的session window。

    gang.gou <[hidden email]> 于2020年5月11日周一 下午3:07写道:

    > Hi,
    >
    >     我想使用Flink的Session Window
    > 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢!
    >
    >

    --

    Benchao Li
    School of Electronics Engineering and Computer Science, Peking University
    Tel:+86-15650713730
    Email: [hidden email]; [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Flink Session Window在窗口创建的时候输出一条数据

Benchao Li
看起来你使用的应该不是blink planner,因为blink planner里的Trigger跟DataStream里面的Trigger是不一样的。
所以如果你用的是legacy planner,可能这个方法是可行的。(我对legacy planner不熟悉)
但是如果你以后转到blink planner,这个应该是搞不了的。

gang.gou <[hidden email]> 于2020年5月12日周二 上午11:18写道:

> 我想通过trigger的方式来实现这个需求,想法是重写EventTimeTrigger
> ,在首条记录进入系统时,触发一下purge,通过ValueStatus
> 记录状态。但是现在遇到的问题是,会重复触发多少。而且窗口关闭时的触发,WindowFunction
> Function收到的记录也不对。请问是我使用方式不对吗?
>
> 自定义的trigger:
> public class SessionComputeTrigger extends Trigger<SessionConvertBean,
> TimeWindow> {
>     private static final long serialVersionUID = 1L;
>     static Logger logger =
> LoggerFactory.getLogger(SessionComputeTrigger.class);
>     private final ReducingStateDescriptor<Long> countDesc = new
> ReducingStateDescriptor<>("count", new SessionComputeTrigger.Sum(),
> LongSerializer.INSTANCE);
>
>     private final ValueStateDescriptor<Boolean> existDesc = new
> ValueStateDescriptor<>("exist", BooleanSerializer.INSTANCE);
>
>     private SessionComputeTrigger() {
>     }
>
>     @Override
>     public TriggerResult onElement(SessionConvertBean element, long
> timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
>         ValueState<Boolean> exist = ctx.getPartitionedState(existDesc);
>         ReducingState<Long> count = ctx.getPartitionedState(countDesc);
>
>         if (exist == null || exist.value() == null || !exist.value()) {
>             count.add(1L);
>             logger.info("mau fire =====>exits:{}
> ====>count:{}====>date_time:{}", exist.value(), count.get(),
> element.dateTime);
>             exist.update(true);
>             return TriggerResult.FIRE;
>         }
>
>         if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
>             // if the watermark is already past the window fire immediately
>             return TriggerResult.FIRE;
>         } else {
>             ctx.registerEventTimeTimer(window.maxTimestamp());
>             return TriggerResult.CONTINUE;
>         }
>     }
>
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) {
>         return time == window.maxTimestamp() ? TriggerResult.FIRE :
> TriggerResult.CONTINUE;
>     }
>
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
>
>     @Override
>     public void clear(TimeWindow window, TriggerContext ctx) throws
> Exception {
>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>     }
>
>     @Override
>     public boolean canMerge() {
>         return true;
>     }
>
>     @Override
>     public void onMerge(TimeWindow window,
>                         OnMergeContext ctx) {
>         long windowMaxTimestamp = window.maxTimestamp();
>         if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
>             ctx.registerEventTimeTimer(windowMaxTimestamp);
>         }
>     }
>
>     @Override
>     public String toString() {
>         return "EventTimeTrigger()";
>     }
>
>     public static SessionComputeTrigger create() {
>         return new SessionComputeTrigger();
>     }
>
>     private static class Sum implements ReduceFunction<Long> {
>         private static final long serialVersionUID = 1L;
>
>         @Override
>         public Long reduce(Long value1, Long value2) throws Exception {
>             logger.info("value1={},value2={}", value1, value2);
>             return value1 + value2;
>         }
>
>     }
> }
>
> 输出值:
>
> mau fire =====>exits:null
> ====>count:1====>date_time:2020-05-10T23:22:40.298+0800
> (date_time是当前记录的eventtime)
>
> result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:10.298+0800,间隔:30000.0,总数:1
> mau fire =====>exits:null
> ====>count:1====>date_time:2020-05-10T23:22:43.298+0800
>
>  result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:13.298+0800,间隔:33000.0,总数:2
> mau fire =====>exits:null
> ====>count:1====>date_time:2020-05-10T23:22:46.298+0800
>
>  result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:16.298+0800,间隔:36000.0,总数:3
> mau fire =====>exits:null
> ====>count:1====>date_time:2020-05-10T23:22:49.298+0800
>
>
>
> 在 2020/5/11 下午4:12,“Benchao Li”<user-zh-return-3480-gougang_1991=
> [hidden email] 代表 [hidden email]> 写入:
>
>     这个暂时应该是没有办法做到这一点。或者你可以用两个query来实现这个?
>     比如一个query是统计first_value;第二个是真正的session window。
>
>     gang.gou <[hidden email]> 于2020年5月11日周一 下午3:07写道:
>
>     > Hi,
>     >
>     >     我想使用Flink的Session Window
>     >
> 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢!
>     >
>     >
>
>     --
>
>     Benchao Li
>     School of Electronics Engineering and Computer Science, Peking
> University
>     Tel:+86-15650713730
>     Email: [hidden email]; [hidden email]
>
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink Session Window在窗口创建的时候输出一条数据

苟刚
好的,我再研究研究,谢谢

在 2020/5/12 下午12:01,“Benchao Li”<user-zh-return-3495-gougang_1991=[hidden email] 代表 [hidden email]> 写入:

    看起来你使用的应该不是blink planner,因为blink planner里的Trigger跟DataStream里面的Trigger是不一样的。
    所以如果你用的是legacy planner,可能这个方法是可行的。(我对legacy planner不熟悉)
    但是如果你以后转到blink planner,这个应该是搞不了的。

    gang.gou <[hidden email]> 于2020年5月12日周二 上午11:18写道:

    > 我想通过trigger的方式来实现这个需求,想法是重写EventTimeTrigger
    > ,在首条记录进入系统时,触发一下purge,通过ValueStatus
    > 记录状态。但是现在遇到的问题是,会重复触发多少。而且窗口关闭时的触发,WindowFunction
    > Function收到的记录也不对。请问是我使用方式不对吗?
    >
    > 自定义的trigger:
    > public class SessionComputeTrigger extends Trigger<SessionConvertBean,
    > TimeWindow> {
    >     private static final long serialVersionUID = 1L;
    >     static Logger logger =
    > LoggerFactory.getLogger(SessionComputeTrigger.class);
    >     private final ReducingStateDescriptor<Long> countDesc = new
    > ReducingStateDescriptor<>("count", new SessionComputeTrigger.Sum(),
    > LongSerializer.INSTANCE);
    >
    >     private final ValueStateDescriptor<Boolean> existDesc = new
    > ValueStateDescriptor<>("exist", BooleanSerializer.INSTANCE);
    >
    >     private SessionComputeTrigger() {
    >     }
    >
    >     @Override
    >     public TriggerResult onElement(SessionConvertBean element, long
    > timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    >         ValueState<Boolean> exist = ctx.getPartitionedState(existDesc);
    >         ReducingState<Long> count = ctx.getPartitionedState(countDesc);
    >
    >         if (exist == null || exist.value() == null || !exist.value()) {
    >             count.add(1L);
    >             logger.info("mau fire =====>exits:{}
    > ====>count:{}====>date_time:{}", exist.value(), count.get(),
    > element.dateTime);
    >             exist.update(true);
    >             return TriggerResult.FIRE;
    >         }
    >
    >         if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
    >             // if the watermark is already past the window fire immediately
    >             return TriggerResult.FIRE;
    >         } else {
    >             ctx.registerEventTimeTimer(window.maxTimestamp());
    >             return TriggerResult.CONTINUE;
    >         }
    >     }
    >
    >     @Override
    >     public TriggerResult onEventTime(long time, TimeWindow window,
    > TriggerContext ctx) {
    >         return time == window.maxTimestamp() ? TriggerResult.FIRE :
    > TriggerResult.CONTINUE;
    >     }
    >
    >     @Override
    >     public TriggerResult onProcessingTime(long time, TimeWindow window,
    > TriggerContext ctx) throws Exception {
    >         return TriggerResult.CONTINUE;
    >     }
    >
    >     @Override
    >     public void clear(TimeWindow window, TriggerContext ctx) throws
    > Exception {
    >         ctx.deleteEventTimeTimer(window.maxTimestamp());
    >     }
    >
    >     @Override
    >     public boolean canMerge() {
    >         return true;
    >     }
    >
    >     @Override
    >     public void onMerge(TimeWindow window,
    >                         OnMergeContext ctx) {
    >         long windowMaxTimestamp = window.maxTimestamp();
    >         if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
    >             ctx.registerEventTimeTimer(windowMaxTimestamp);
    >         }
    >     }
    >
    >     @Override
    >     public String toString() {
    >         return "EventTimeTrigger()";
    >     }
    >
    >     public static SessionComputeTrigger create() {
    >         return new SessionComputeTrigger();
    >     }
    >
    >     private static class Sum implements ReduceFunction<Long> {
    >         private static final long serialVersionUID = 1L;
    >
    >         @Override
    >         public Long reduce(Long value1, Long value2) throws Exception {
    >             logger.info("value1={},value2={}", value1, value2);
    >             return value1 + value2;
    >         }
    >
    >     }
    > }
    >
    > 输出值:
    >
    > mau fire =====>exits:null
    > ====>count:1====>date_time:2020-05-10T23:22:40.298+0800
    > (date_time是当前记录的eventtime)
    >
    > result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:10.298+0800,间隔:30000.0,总数:1
    > mau fire =====>exits:null
    > ====>count:1====>date_time:2020-05-10T23:22:43.298+0800
    >
    >  result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:13.298+0800,间隔:33000.0,总数:2
    > mau fire =====>exits:null
    > ====>count:1====>date_time:2020-05-10T23:22:46.298+0800
    >
    >  result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:16.298+0800,间隔:36000.0,总数:3
    > mau fire =====>exits:null
    > ====>count:1====>date_time:2020-05-10T23:22:49.298+0800
    >
    >
    >
    > 在 2020/5/11 下午4:12,“Benchao Li”<user-zh-return-3480-gougang_1991=
    > [hidden email] 代表 [hidden email]> 写入:
    >
    >     这个暂时应该是没有办法做到这一点。或者你可以用两个query来实现这个?
    >     比如一个query是统计first_value;第二个是真正的session window。
    >
    >     gang.gou <[hidden email]> 于2020年5月11日周一 下午3:07写道:
    >
    >     > Hi,
    >     >
    >     >     我想使用Flink的Session Window
    >     >
    > 去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢!
    >     >
    >     >
    >
    >     --
    >
    >     Benchao Li
    >     School of Electronics Engineering and Computer Science, Peking
    > University
    >     Tel:+86-15650713730
    >     Email: [hidden email]; [hidden email]
    >
    >
    >

    --

    Benchao Li
    School of Electronics Engineering and Computer Science, Peking University
    Tel:+86-15650713730
    Email: [hidden email]; [hidden email]