pyflink 如何使用session window对相同pv数据聚合

classic Classic list List threaded Threaded
4 messages Options
kk
Reply | Threaded
Open this post in threaded view
|

pyflink 如何使用session window对相同pv数据聚合

kk
hi,all:
一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session
window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。
希望知道的大佬能给点建议。感谢!!!

session_window = Session.with_gap("60.second").on("pv_time").alias("w")
t_env.from_path('source') \
    .window(session_window) \
    .group_by("w,pv_id") \
    .select("pv_id,get_act(act)").insert_into("sink")

<http://apache-flink.147419.n8.nabble.com/file/t1355/infoflow_2021-03-08_19-02-16.png>



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

回复:pyflink 如何使用session window对相同pv数据聚合

Hongyuan Ma
我也想知道,我看文档,目前pyflink似乎还不支持processfunction


在2021年03月08日 19:03,kk 写道:
hi,all:
一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session
window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。
希望知道的大佬能给点建议。感谢!!!

session_window = Session.with_gap("60.second").on("pv_time").alias("w")
t_env.from_path('source') \
   .window(session_window) \
   .group_by("w,pv_id") \
   .select("pv_id,get_act(act)").insert_into("sink")

<http://apache-flink.147419.n8.nabble.com/file/t1355/infoflow_2021-03-08_19-02-16.png&gt;



--
Sent from: http://apache-flink.147419.n8.nabble.com/
kk
Reply | Threaded
Open this post in threaded view
|

Re: 回复:pyflink 如何使用session window对相同pv数据聚合

kk
我之前测试过slide window,可以使用。就是无法在session window中使用,group windowed table不支持。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink 如何使用session window对相同pv数据聚合

Xingbo Huang
In reply to this post by Hongyuan Ma
Hi,
1.12 还不支持session window的udaf,在1.13上将提供这部分的支持,具体可以关注JIRA[1]。
然后,1.12是支持ProcessFunction和KeyedProcessFunction的,具体可以参考代码[2]

[1] https://issues.apache.org/jira/browse/FLINK-21630
[2]
https://github.com/apache/flink/blob/release-1.12/flink-python/pyflink/datastream/functions.py

Best,
Xingbo

Hongyuan Ma <[hidden email]> 于2021年3月8日周一 下午7:10写道:

> 我也想知道,我看文档,目前pyflink似乎还不支持processfunction
>
>
> 在2021年03月08日 19:03,kk 写道:
> hi,all:
>
> 一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session
> window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。
> 希望知道的大佬能给点建议。感谢!!!
>
> session_window = Session.with_gap("60.second").on("pv_time").alias("w")
> t_env.from_path('source') \
>    .window(session_window) \
>    .group_by("w,pv_id") \
>    .select("pv_id,get_act(act)").insert_into("sink")
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t1355/infoflow_2021-03-08_19-02-16.png&gt
> ;
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>