source是kafka,有一个rowtime定义:
.field("rowtime", DataTypes.TIMESTAMP(0))
.rowtime(Rowtime()
.timestamps_from_field("actionTime")
.watermarks_periodic_bounded(60000)
)
有两个sink,第一个sink是直接把kafa的数据保存到postgres。
第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。
st_env.scan("source") \
.window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))
\
.group_by("hourlywindow") \
.select("udf(...)")
...
现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。
有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。
--
Sent from:
http://apache-flink.147419.n8.nabble.com/