我也想知道,我看文档,目前pyflink似乎还不支持processfunction
在2021年03月08日 19:03,kk 写道:
hi,all:
一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session
window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。
希望知道的大佬能给点建议。感谢!!!
session_window = Session.with_gap("60.second").on("pv_time").alias("w")
t_env.from_path('source') \
.window(session_window) \
.group_by("w,pv_id") \
.select("pv_id,get_act(act)").insert_into("sink")
<
http://apache-flink.147419.n8.nabble.com/file/t1355/infoflow_2021-03-08_19-02-16.png>--
Sent from:
http://apache-flink.147419.n8.nabble.com/