我目前在使用Flink CEP进行一些性能测试,Flink版本是1.13
CEP的逻辑如下:
INSERT INTO TO_KAFKA
SELECT bizName, wdName, wdValue , zbValue , flowId FROM
SOURCE_KAFKA MATCH_RECOGNIZE
(
PARTITION BY flow_id
ORDER BY proctime
MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as wdValue, MAP[
A.zb_name, A.zb_value,
B.zb_name, B.zb_value
] as zbValue, A.flow_id as flowId
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN ( A B ) WITHIN INTERVAL '10' SECOND
DEFINE
B AS B.flow_id = A.flow_id
);
为了防止CEP的状态无限增长,我添加了 WITHIN 时间限制,只保留10S未匹配的状态。
实际任务运行过程中发现,checkpoint状态持续增长,似乎状态没有按照预想的设置进行清除:
有什么有效手段能处理这种情况吗?