您好,请教您一个问题
flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发 create table kafka_table ( `log_id` string, event_date timestamp(3), process_time as PROCTIME(), ts as event_date, watermark for ts as ts - interval '1' second ) with ( 'connector' = 'kafka', 'topic' = 'kafka_table', 'properties.bootstrap.servers' = '10.2.12.3:9092', 'properties.group.id' = 'tmp-log-consumer003', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) 执行的sql是 select TUMBLE_START(kafka_table.event_date, INTERVAL '10' SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' SECOND),src_ip,count(dest_ip) from kafka_table group by TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip select log_id,process_time,ts from kafka_table查询的表结构如下 表结构为 root |-- log_id: STRING |-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME* |-- ts: TIMESTAMP(3) *ROWTIME* 输入数据为 log_id,process_time,ts 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806 |
Administrator
|
重复的问题。我将刚刚的回答也贴在这里。
如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发: 1. 保证所有 partition 都有数据。 2. 且每个 partition 数据的 event time 都在前进 3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s = 11s 以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。 Best, Jark On Sat, 14 Nov 2020 at 16:35, 李世钰 <[hidden email]> wrote: > 您好,请教您一个问题 > flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发 > create table kafka_table ( > `log_id` string, > event_date timestamp(3), > process_time as PROCTIME(), > ts as event_date, > watermark for ts as ts - interval '1' second > ) with ( > 'connector' = 'kafka', > 'topic' = 'kafka_table', > 'properties.bootstrap.servers' = '10.2.12.3:9092', > 'properties.group.id' = 'tmp-log-consumer003', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ) > 执行的sql是 > select TUMBLE_START(kafka_table.event_date, INTERVAL '10' > SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' > SECOND),src_ip,count(dest_ip) from kafka_table group by > TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip > > > > > select log_id,process_time,ts from kafka_table查询的表结构如下 > 表结构为 > root > |-- log_id: STRING > |-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME* > |-- ts: TIMESTAMP(3) *ROWTIME* > > > 输入数据为 > log_id,process_time,ts > 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806 > 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806 > 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806 > 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806 > 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806 > 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806 |
Free forum by Nabble | Edit this page |