遇到一个比较奇怪的问题,请大佬帮忙看看,是否是我忽略了新版本的什么特性?
大概逻辑是,有一个Kafka流表(多个分区),关联一张mysql字典表,关联之后进行聚合操作, 滑动窗口长度3分钟,统计间隔1分钟。 1.11.3版本下,能够稳定计算出结果,每分钟一条数据,符合预期,样例如下。 | request_time | ………… | insert_time | | 2021-07-29 16:06:00.0 | ………… | 2021-07-29 16:06:28 | | 2021-07-29 16:07:00.0 | ………… | 2021-07-29 16:07:33 | | 2021-07-29 16:10:00.0 | ………… | 2021-07-29 16:10:34 | | 2021-07-29 16:11:00.0 | ………… | 2021-07-29 16:11:29 | | 2021-07-29 16:12:00.0 | ………… | 2021-07-29 16:12:34 | | 2021-07-29 16:18:00.0 | ………… | 2021-07-29 16:18:29 | | 2021-07-29 16:19:00.0 | ………… | 2021-07-29 16:19:29 | | 2021-07-29 16:20:00.0 | ………… | 2021-07-29 16:20:24 | | 2021-07-29 16:22:00.0 | ………… | 2021-07-29 16:22:24 | 同样的sql逻辑,迁移到k1.12.4/1.13.1以后, 1、默认配置不能计算出结果,经过分析发现是数据量少,没有数据的Kafka分区影响了watermark, 增加配置"table.exec.source.idle-timeout","2000 ms"以后,能够计算出结果, 2、但是,当数据量少的时候,并不能按照预期的时间间隔产生结果数据, 本该每分钟写入一条数据,但是实际上可能很久之后才写库 | request_time | ………… | insert_time | | 2021-07-29 15:29:00.0 | ………… | 2021-07-29 15:31:06 | | 2021-07-29 15:30:00.0 | ………… | 2021-07-29 15:31:31 | | 2021-07-29 15:31:00.0 | ………… | 2021-07-29 15:36:42 | | 2021-07-29 15:32:00.0 | ………… | 2021-07-29 15:36:42 | | 2021-07-29 15:33:00.0 | ………… | 2021-07-29 15:36:42 | | 2021-07-29 15:34:00.0 | ………… | 2021-07-29 15:38:52 | 我的查询逻辑在这里: insert into test select request_time ,province_center_id ,count(distinct case when communication_type='aa' then terminalNo else null end) ,count(distinct case when communication_type='bb' then terminalNo else null end) ,count(distinct case when communication_type='cc' then terminalNo else null end) ,'GX' as system_generation from (select SUBSTRING(DATE_FORMAT(HOP_END(eventTime,INTERVAL '1' MINUTE, INTERVAL '3' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,terminalNo ,max(eventTime) as heartbeatTime ,province_center_id ,communication_type from log_kafka inner join terminal_dic for system_time as of log_kafka.proc_time on log_kafka.terminalNo= terminal_dic.terminal_no group by hop(eventTime,INTERVAL '1' MINUTE, INTERVAL '3' MINUTE),terminalNo,province_center_id,communication_type ) where (UNIX_TIMESTAMP(request_time) - UNIX_TIMESTAMP(DATE_FORMAT(heartbeatTime,'yyyy-MM-dd HH:mm:ss.SSS')) ) < 150 group by request_time,province_center_id 数据源:kafkasource: create table log_kafka ( ……………… eventTime timestamp(3), ……………… terminalNo varchar, ……………… proc_time as proctime(), watermark for eventTime as eventTime - interval '15' second ) with( 'connector' = 'kafka', 'topic' = '…………', 'properties.bootstrap.servers' = '………………', 'properties.group.id' = '…………', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) |
Free forum by Nabble | Edit this page |