Hi 大家好,
近期在处理 LEFT JOIN 语句时,发现了一个奇怪的现象:假设有如下 SQL 语句:
CREATE TABLE A (
key INT
) WITH (
'connector' = 'kafka',
);
CREATE TABLE B (
key INT
) WITH (
'connector' = 'kafka',
);
CREATE TABLE Sink (
id INTEGER,
upsert_value BIGINT,
primary key (`id`) NOT ENFORCED
) WITH (
'connector.type' = 'elasticsearch',
'update-mode' = 'upsert', -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式
);
INSERT INTO Sink
SELECT A.key, SUM(B.key)
FROM A LEFT JOIN B ON A.key = B.key
GROUP BY A.key;
用于 LEFT JOIN 的左表叫做 A,右表叫做 B,那么:
场景 1. 如果左表 A 来了一条数据 key=100,在右表 B 中首次没有 JOIN 成功(此时 B 还没有 key=100 的数据),则会向下游 ES Sink 输出 Upsert 消息(true, 100, null)。如果过段时间之后,B 有了 key=100 的数据,此时 Flink 会发出 DELETE 消息(false, 100, null),随后再发送一条 UPSERT 消息(例如 true, 100, 100)更新下游结果。此后无论如何,再也不会输出 DELETE 消息了。
场景 2.
如果左表 A 来了一条数据 key=100,在右表 B 中首次 JOIN 成功(即 B 已经有 key=100 的数据) ,则不会输出 DELETE 消息,而是直接输出 Upsert 消息(true, 100, 100),此后无论如何,再也不会输出 DELETE 消息。
问题:
请问场景 1 中的 LEFT JOIN 输出 Delete 消息是否有必要呢?我理解直接对于场景 1,直接发出 Upsert 消息也可以,Delete 看似用途不大。而且,Delete 消息会造成对应 doc id 中的一些字段被清除(如果之前该 doc 保存有其他 Flink 表中未定义的字段的话),造成字段的意外丢失。
阅读了 GroupAggFunction 的代码,看到有如下的逻辑,请问这个设计是否可以阐述一下是为了避免什么情况呢?非常感谢 :)