flinksql 1.12.4/1.13.1 hop+count+join 聚合触发间隔不稳定

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flinksql 1.12.4/1.13.1 hop+count+join 聚合触发间隔不稳定

鱼子酱
遇到一个比较奇怪的问题,请大佬帮忙看看,是否是我忽略了新版本的什么特性?

大概逻辑是,有一个Kafka流表(多个分区),关联一张mysql字典表,关联之后进行聚合操作,
滑动窗口长度3分钟,统计间隔1分钟。

1.11.3版本下,能够稳定计算出结果,每分钟一条数据,符合预期,样例如下。
| request_time                |   …………    | insert_time                |
| 2021-07-29 16:06:00.0 |   …………    | 2021-07-29 16:06:28 |
| 2021-07-29 16:07:00.0 |   …………    | 2021-07-29 16:07:33 |
| 2021-07-29 16:10:00.0 |   …………    | 2021-07-29 16:10:34 |
| 2021-07-29 16:11:00.0 |   …………    | 2021-07-29 16:11:29 |
| 2021-07-29 16:12:00.0 |   …………    | 2021-07-29 16:12:34 |
| 2021-07-29 16:18:00.0 |   …………    | 2021-07-29 16:18:29 |
| 2021-07-29 16:19:00.0 |   …………    | 2021-07-29 16:19:29 |
| 2021-07-29 16:20:00.0 |   …………    | 2021-07-29 16:20:24 |
| 2021-07-29 16:22:00.0 |   …………    | 2021-07-29 16:22:24 |


同样的sql逻辑,迁移到k1.12.4/1.13.1以后,
1、默认配置不能计算出结果,经过分析发现是数据量少,没有数据的Kafka分区影响了watermark,
     增加配置"table.exec.source.idle-timeout","2000 ms"以后,能够计算出结果,
2、但是,当数据量少的时候,并不能按照预期的时间间隔产生结果数据,
     本该每分钟写入一条数据,但是实际上可能很久之后才写库
| request_time                |   …………    | insert_time         |
| 2021-07-29 15:29:00.0 |   …………    | 2021-07-29 15:31:06 |
| 2021-07-29 15:30:00.0 |   …………    | 2021-07-29 15:31:31 |
| 2021-07-29 15:31:00.0 |   …………    | 2021-07-29 15:36:42 |
| 2021-07-29 15:32:00.0 |   …………    | 2021-07-29 15:36:42 |
| 2021-07-29 15:33:00.0 |   …………    | 2021-07-29 15:36:42 |
| 2021-07-29 15:34:00.0 |   …………    | 2021-07-29 15:38:52 |


我的查询逻辑在这里:
    insert into test
    select request_time
    ,province_center_id
    ,count(distinct case when communication_type='aa' then terminalNo else null end)
    ,count(distinct case when communication_type='bb' then terminalNo else null end)
    ,count(distinct case when communication_type='cc' then terminalNo else null end)
    ,'GX' as system_generation
    from
    (select SUBSTRING(DATE_FORMAT(HOP_END(eventTime,INTERVAL '1' MINUTE, INTERVAL '3' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
    ,terminalNo
    ,max(eventTime) as heartbeatTime
    ,province_center_id
    ,communication_type
    from log_kafka inner join terminal_dic for system_time as of log_kafka.proc_time
    on log_kafka.terminalNo= terminal_dic.terminal_no
    group by hop(eventTime,INTERVAL '1' MINUTE, INTERVAL '3' MINUTE),terminalNo,province_center_id,communication_type
    )
    where (UNIX_TIMESTAMP(request_time) - UNIX_TIMESTAMP(DATE_FORMAT(heartbeatTime,'yyyy-MM-dd HH:mm:ss.SSS')) ) < 150
    group by request_time,province_center_id


数据源:kafkasource:
    create table log_kafka (
………………
      eventTime timestamp(3),
………………
      terminalNo varchar,
………………
      proc_time as proctime(),
      watermark for eventTime as eventTime - interval '15' second
    )
    with(
     'connector' = 'kafka',
     'topic' = '…………',
     'properties.bootstrap.servers' = '………………',
     'properties.group.id' = '…………',
     'scan.startup.mode' = 'latest-offset',
     'format' = 'json'
    )