咨询一下 LEFT JOIN 产生 DELETE 消息的疑惑

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

咨询一下 LEFT JOIN 产生 DELETE 消息的疑惑

DONG, Weike
Hi 大家好,

近期在处理 LEFT JOIN 语句时,发现了一个奇怪的现象:假设有如下 SQL 语句:

CREATE TABLE A ( 
    key INT
WITH ( 
    'connector' = 'kafka',
);

CREATE TABLE B ( 
  key INT
WITH ( 
    'connector' = 'kafka',
);

CREATE TABLE Sink (
    id INTEGER,
    upsert_value BIGINT,
    primary key (`id`) NOT ENFORCED
WITH (
    'connector.type' = 'elasticsearch'
    'update-mode' = 'upsert',            -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式     
);

INSERT INTO Sink
SELECT A.key, SUM(B.key)
FROM A LEFT JOIN B ON A.key = B.key
GROUP BY A.key;


用于 LEFT JOIN 的左表叫做 A,右表叫做 B,那么:

场景 1. 如果左表 A 来了一条数据 key=100,在右表 B 中首次没有 JOIN 成功(此时 B 还没有 key=100 的数据),则会向下游 ES Sink 输出 Upsert 消息(true, 100, null)。如果过段时间之后,B 有了 key=100 的数据,此时 Flink 会发出 DELETE 消息(false, 100, null),随后再发送一条 UPSERT 消息(例如 true, 100, 100)更新下游结果。此后无论如何,再也不会输出 DELETE 消息了。

场景 2. 如果左表 A 来了一条数据 key=100,在右表 B 中首次 JOIN 成功(即 B 已经有 key=100 的数据) ,则不会输出 DELETE 消息,而是直接输出 Upsert 消息(true, 100, 100),此后无论如何,再也不会输出 DELETE 消息。


问题:

请问场景 1 中的 LEFT JOIN 输出 Delete 消息是否有必要呢?我理解直接对于场景 1,直接发出 Upsert 消息也可以,Delete 看似用途不大。而且,Delete 消息会造成对应 doc id 中的一些字段被清除(如果之前该 doc 保存有其他 Flink 表中未定义的字段的话),造成字段的意外丢失。

阅读了 GroupAggFunction 的代码,看到有如下的逻辑,请问这个设计是否可以阐述一下是为了避免什么情况呢?非常感谢 :)