如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:
1. 保证所有 partition 都有数据。
2. 且每个 partition 数据的 event time 都在前进
3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =
11s
以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。
Best,
Jark
On Sat, 14 Nov 2020 at 15:11, 李世钰 <
[hidden email]> wrote:
> flink版本 flink1.11
>
>
> flink sql连接kafka
> create table kafka_table (
> log_id string,
> event_time bigint,
> process_time as PROCTIME(),
> ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)),
> watermark for ts as ts - interval '1' second
> ) with (
> 'connector' = 'kafka',
> 'topic' = 'kafka_table',
> 'properties.bootstrap.servers' = '10.2.12.3:9092',
> 'properties.group.id' = 'tmp-log-consumer003',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
>
>
>
>
>
> 使用窗口聚合的代码
> val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL
> '10' SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10'
> SECOND, INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group
> by HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5'
> SECOND),kafka_table.src_ip")
>
>
> 相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发,
> 系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的
> 求问是什么原因不能触发窗口或者我的用法有什么问题吗