kafka 数据格式:
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20 06:39:05.088"} {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20 06:47:34.609"} *kafka ddl :* CREATE TABLE washroom_detail ( building_id STRING, sofa_id STRING, floor_num INT, occupy_status INT, start_time BIGINT, end_time BIGINT, process_time TIMESTAMP, occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 'HH:mm')), local_date as date_format(cast(start_time / 1000 as timestamp), 'yyyy-MM-dd'), day_hour as cast(date_format(cast(start_time / 1000 as timestamp), 'HH') as INT) + 8 ) WITH ( 'connector' = 'kafka', 'topic' = 'xxxxxxxx', 'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx', 'properties.group.id' = 'xxxxxxxxxxxx', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); *mysql ddl:* create table hour_ddl ( building_id STRING, sofa_id STRING, local_date STRING, `hour` INT, floor_num INT, occupy_frequency INT, occupy_times STRING, update_time TIMESTAMP, process_time TIMESTAMP, primary key (building_id, sofa_id, local_date, `hour`) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = 'xxxxxxxx', 'table-name' = 'xxxxxxxx', 'username' = 'xxxxx' 'password' = 'xxxxxx' ) *flink sql dml:* INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, occupy_frequency, occupy_times, update_time, process_time) SELECT a.building_id, a.sofa_id, a.local_date, a.day_hour, a.floor_num, CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, b.occupy_frequency) AS INT), concat(if(b.occupy_times IS NULL, '', b.occupy_times), if(b.occupy_times IS NULL, a.times, concat(',', a.times))), NOW(), a.process_time FROM (SELECT building_id, sofa_id, local_date, day_hour, floor_num, count(1) AS frequency, LISTAGG(occupy_times) AS times, MAX(process_time) AS process_time, PROCTIME() AS compute_time FROM washroom_detail GROUP BY building_id, sofa_id, local_date, day_hour, floor_num) a LEFT JOIN hour_ddl FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id AND a.sofa_id = b.sofa_id AND a.local_date = b.local_date AND a.day_hour = b.`hour` WHERE a.process_time > b.process_time OR b.process_time IS NULL 现象: 当mysql 没有数据时,插入一条记录 occupy_frequency occupy_times 1 15:01-15:03 当主键冲突时 occupy_frequency occupy_times 3 15:01-15:03,15:01-15:03,15:03-15:04 希望应该是 occupy_frequency occupy_times 2 15:01-15:03,15:03-15:04 |
Free forum by Nabble | Edit this page |