回复: 请指教一个关于时间窗的问题,非常感谢!

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

回复: 请指教一个关于时间窗的问题,非常感谢!

jacky-cui
taochang说的很对,不过我觉得还要补充一点,窗口是左开右闭,你小时窗口,默认窗口是 [17:00,18:00),由于你watermark是5秒,你的 (event time - 5) 满足 >= 18:00就会触发,漏了等于,同时触发的另一个条件是窗口有数据


------------------ 原始邮件 ------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年9月3日(星期四) 上午10:35
收件人:&nbsp;"user-zh"<[hidden email]&gt;;"[hidden email]"<[hidden email]&gt;;

主题:&nbsp;Re: 请指教一个关于时间窗的问题,非常感谢!



没有问题的,时间窗口是左闭右开,你的窗口按照org.apache.flink.streaming.api.windowing.windows.TimeWindow按照


getWindowStartWithOffset方法的定义,应该是17-18的窗口,但是应该不是2020-09-01 18:00:00.0
点出发,因为左闭右开,应该是大于2020-09-01 18:00:00.0
的时间,比如:2020-09-01 18:00:00.001出发。

再加上你的wartermarker5秒,应该是2020-09-01 18:00:05.001 会触发

在 2020/9/2 15:20, [hidden email] 写道:
&gt; 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //指定eventtime字段及生成watermark
&gt; DataStream<Tuple4<String,String,String,Long&gt;&gt; withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
&gt; WatermarkStrategy
&gt; .<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
&gt; //.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
&gt; .withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; //即时没数据时,也生成watermark
&gt; .withTimestampAssigner((event, timestamp)-&gt;event.f3));
&gt;
&gt; StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
&gt; tenv.registerDataStream(
&gt; "log",
&gt; withTimestampsAndWatermarksDS,
&gt; "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
&gt;
&gt; String sql = "select appid,eventid,cnt," +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(starttime + interval '8' hour ) as stime," +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(endtime + interval '8' hour ) as etime&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from (select appid,eventid,count(*) as cnt," +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";&nbsp;&nbsp;&nbsp; //希望整点结束时触发时间窗关闭
&gt;
&gt; Table table = tenv.sqlQuery(sql);
&gt; DataStream<Result&gt; dataStream = tenv.toAppendStream(table, Result.class);
&gt;
&gt; 输出的结果是:
&gt; (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
&gt; (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
&gt; (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
&gt; (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
&gt; (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发
&gt; ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
&gt; 请问一下哪里出了问题?万分感谢!