flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口

classic Classic list List threaded Threaded
2 messages Options
me
Reply | Threaded
Open this post in threaded view
|

flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口

me
flink版本 flink1.11


flink sql连接kafka
create table kafka_table (
log_id  string,
event_time bigint,
process_time as PROCTIME(),
ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)),
watermark for ts as ts - interval '1' second
) with (
 'connector' = 'kafka',
 'topic' = 'kafka_table',
 'properties.bootstrap.servers' = '10.2.12.3:9092',
 'properties.group.id' = 'tmp-log-consumer003',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)





使用窗口聚合的代码
val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group by HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' SECOND),kafka_table.src_ip")


相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发,
系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的
求问是什么原因不能触发窗口或者我的用法有什么问题吗
Reply | Threaded
Open this post in threaded view
|

Re: flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口

Jark
Administrator
如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:
1. 保证所有 partition 都有数据。
2. 且每个 partition 数据的 event time 都在前进
3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =
11s

以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。

Best,
Jark

On Sat, 14 Nov 2020 at 15:11, 李世钰 <[hidden email]> wrote:

> flink版本 flink1.11
>
>
> flink sql连接kafka
> create table kafka_table (
> log_id&nbsp; string,
> event_time bigint,
> process_time as PROCTIME(),
> ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)),
> watermark for ts as ts - interval '1' second
> ) with (
> &nbsp;'connector' = 'kafka',
> &nbsp;'topic' = 'kafka_table',
> &nbsp;'properties.bootstrap.servers' = '10.2.12.3:9092',
> &nbsp;'properties.group.id' = 'tmp-log-consumer003',
> &nbsp;'format' = 'json',
> &nbsp;'scan.startup.mode' = 'latest-offset'
> )
>
>
>
>
>
> 使用窗口聚合的代码
> val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL
> '10' SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10'
> SECOND, INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group
> by HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5'
> SECOND),kafka_table.src_ip")
>
>
> 相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发,
> 系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的
> 求问是什么原因不能触发窗口或者我的用法有什么问题吗