回复: 根据业务需求选择合适的flink state

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

回复: 根据业务需求选择合适的flink state

纪军伟
退订


| |
纪军伟
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2021年01月23日 15:43,徐州州<[hidden email]> 写道:
我觉得你可以尝试一下TTL,keyby之后设置key状态的失效时间为1分钟,如果一分钟没数据进来就清空state。




------------------&nbsp;原始邮件&nbsp;------------------
发件人: "张锴"<[hidden email]&gt;;
发送时间: 2021年1月22日(星期五) 下午5:04
收件人: "user-zh"<[hidden email]&gt;;
主题: Re: 根据业务需求选择合适的flink state



@赵一旦
可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new
AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题

赵一旦 <[hidden email]&gt; 于2021年1月22日周五 上午10:10写道:

&gt; 我理解你要的最终mysql结果表是:
&gt; 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);....
&gt;
&gt; 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
&gt;
&gt;
&gt; 如上按照我的方案就可以实现哈。
&gt;
&gt; xuhaiLong <[hidden email]&gt; 于2021年1月22日周五 上午10:03写道:
&gt;
&gt; &gt; 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
&gt; &gt; userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
&gt; 试试?
&gt; &gt;
&gt; &gt;
&gt; &gt; 在2021年1月21日 18:24,张锴<[hidden email]&gt; 写道:
&gt; &gt; 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
&gt; &gt;
&gt; &gt;
&gt; context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
&gt; &gt; 下面是我的部分代码逻辑:
&gt; &gt;
&gt; &gt; val ds = dataStream
&gt; &gt; .filter(_.liveType == 1)
&gt; &gt; .keyBy(1, 2)
&gt; &gt; .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
&gt; &gt; .process(new myProcessWindow()).uid("process-id")
&gt; &gt;
&gt; &gt; class myProcessWindow() extends
&gt; &gt; ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
&gt; &gt; TimeWindow] {
&gt; &gt;
&gt; &gt; override def process(key: Tuple, context: Context, elements:
&gt; &gt; Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
&gt; &gt; = {
&gt; &gt; var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
&gt; &gt; var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
&gt; &gt;
&gt; &gt; val currentDate = DateUtil.currentDate
&gt; &gt; val created_time = currentDate
&gt; &gt; val modified_time = currentDate
&gt; &gt; 。。。
&gt; &gt;
&gt; &gt; val join_time: String =
&gt; &gt; DateUtil.convertTimeStamp2DateStr(startTime,
&gt; &gt; DateUtil.SECOND_DATE_FORMAT)
&gt; &gt; val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
&gt; &gt; DateUtil.SECOND_DATE_FORMAT)
&gt; &gt; val duration = (endTime - startTime) / 1000&nbsp; //停留多少秒
&gt; &gt; val duration_time = DateUtil.secondsToFormat(duration)&nbsp; //停留时分秒
&gt; &gt; out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
&gt; &gt; courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
&gt; &gt; join_time, leave_time, created_time, modified_time
&gt; &gt; , liveType, plat_form, duration, duration_time,
&gt; &gt; network_operator, role, useragent, uid, eventTime))
&gt; &gt;
&gt; &gt; CloudliveWatcher(id, partnerId, courseId, customerId,
&gt; &gt; courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
&gt; &gt; join_time, leave_time, created_time, modified_time
&gt; &gt; , liveType, plat_form, duration, duration_time,
&gt; &gt; network_operator, role, useragent, uid, eventTime)
&gt; &gt;
&gt; &gt; }
&gt; &gt;
&gt; &gt;
&gt; &gt; 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; 赵一旦 <[hidden email]&gt; 于2020年12月28日周一 下午7:12写道:
&gt; &gt;
&gt; &gt; 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
&gt; &gt;
&gt; &gt; 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
&gt; &gt;
&gt; &gt; session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
&gt; &gt;
&gt; &gt;
&gt; &gt; 张锴 <[hidden email]&gt; 于2020年12月28日周一 下午5:35写道:
&gt; &gt;
&gt; &gt; 能描述一下用session window的考虑吗
&gt; &gt;
&gt; &gt; Akisaya <[hidden email]&gt; 于2020年12月28日周一 下午5:00写道:
&gt; &gt;
&gt; &gt; 这个可以用 session window 吧
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
&gt; &gt;
&gt; &gt; [hidden email] <[hidden email]&gt; 于2020年12月28日周一 下午2:15写道:
&gt; &gt;
&gt; &gt; 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; [hidden email]
&gt; &gt;
&gt; &gt; 发件人: 张锴
&gt; &gt; 发送时间: 2020-12-28 13:35
&gt; &gt; 收件人: user-zh
&gt; &gt; 主题: 根据业务需求选择合适的flink state
&gt; &gt; 各位大佬帮我分析下如下需求应该怎么写
&gt; &gt;
&gt; &gt; 需求说明:
&gt; &gt;
&gt; &gt; 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
&gt; &gt;
&gt; &gt; 我的想法:
&gt; &gt; 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
&gt; &gt; 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
&gt; &gt;
&gt; &gt; 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
&gt; &gt;
&gt; &gt; flink 版本1.10.1
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt;