flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题

classic Classic list List threaded Threaded
2 messages Options
me
Reply | Threaded
Open this post in threaded view
|

flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题

me
您好,请教您一个问题
flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发
create table kafka_table (
`log_id`  string,
event_date timestamp(3),
process_time as PROCTIME(),
ts as event_date,
watermark for ts as ts - interval '1' second
) with (
 'connector' = 'kafka',
 'topic' = 'kafka_table',
 'properties.bootstrap.servers' = '10.2.12.3:9092',
 'properties.group.id' = 'tmp-log-consumer003',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)
执行的sql是
select TUMBLE_START(kafka_table.event_date, INTERVAL '10' SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' SECOND),src_ip,count(dest_ip) from kafka_table group by TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip




select log_id,process_time,ts from kafka_table查询的表结构如下
表结构为
root
 |-- log_id: STRING
 |-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME*
 |-- ts: TIMESTAMP(3) *ROWTIME*


 输入数据为
log_id,process_time,ts
13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806
13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806
13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806
13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806
13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806
13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题

Jark
Administrator
重复的问题。我将刚刚的回答也贴在这里。

如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:
1. 保证所有 partition 都有数据。
2. 且每个 partition 数据的 event time 都在前进
3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =
11s

以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。

Best,
Jark

On Sat, 14 Nov 2020 at 16:35, 李世钰 <[hidden email]> wrote:

> 您好,请教您一个问题
> flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发
> create table kafka_table (
> `log_id`&nbsp; string,
> event_date timestamp(3),
> process_time as PROCTIME(),
> ts as event_date,
> watermark for ts as ts - interval '1' second
> ) with (
> &nbsp;'connector' = 'kafka',
> &nbsp;'topic' = 'kafka_table',
> &nbsp;'properties.bootstrap.servers' = '10.2.12.3:9092',
> &nbsp;'properties.group.id' = 'tmp-log-consumer003',
> &nbsp;'format' = 'json',
> &nbsp;'scan.startup.mode' = 'latest-offset'
> )
> 执行的sql是
> select TUMBLE_START(kafka_table.event_date, INTERVAL '10'
> SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10'
> SECOND),src_ip,count(dest_ip) from kafka_table group by
> TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip
>
>
>
>
> select log_id,process_time,ts from kafka_table查询的表结构如下
> 表结构为
> root
> &nbsp;|-- log_id: STRING
> &nbsp;|-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME*
> &nbsp;|-- ts: TIMESTAMP(3) *ROWTIME*
>
>
> &nbsp;输入数据为
> log_id,process_time,ts
> 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806
> 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806
> 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806
> 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806
> 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806
> 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806