Flink CEP任务状态持续增大

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink CEP任务状态持续增大

RL_LEE
我目前在使用Flink CEP进行一些性能测试,Flink版本是1.13

CEP的逻辑如下:

INSERT INTO TO_KAFKA
SELECT bizName, wdName, wdValue , zbValue , flowId FROM
SOURCE_KAFKA MATCH_RECOGNIZE
(
    PARTITION BY flow_id
    ORDER BY proctime
    MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as wdValue, MAP[
        A.zb_name, A.zb_value,
        B.zb_name, B.zb_value
    ] as zbValue, A.flow_id as flowId
    ONE ROW PER MATCH
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN ( A B ) WITHIN INTERVAL '10' SECOND
    DEFINE
    B AS B.flow_id = A.flow_id
);

为了防止CEP的状态无限增长,我添加了 WITHIN 时间限制,只保留10S未匹配的状态。

实际任务运行过程中发现,checkpoint状态持续增长,似乎状态没有按照预想的设置进行清除:



有什么有效手段能处理这种情况吗?