flink1.9.1 cep 出现反压及内存问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink1.9.1 cep 出现反压及内存问题

yitian_song
Hi all,我在使用flink cep的过程中遇到了如下问题(一个 A FollowedBy B 的 case)

Case1. 我构造的数据源中全是事件A,全速发送

该任务在执行数秒后阻塞(task之间不再有数据交换),源端的backpressure=1,一段时间后TM失去心跳响应,任务异常终止,此时TaskManager进程存活,但通过jstat观察内存使用情况后,发现内存被耗尽,且不停的FullGC

Case2. 我构造的数据源中交叉发送事件A和事件B,全速发送
    该任务能够正常执行,但源端的backpressure=1,且整个任务的处理性能非常低,大约只有不到5000的eps

问题如下:
    1. Flink CEP是否存在潜在的性能问题?
    2. 为什么会出现OOM,是否使用不当导致的
    3. 总数据量的大小应该小于3GB,是否是由于cep operator
state累积导致的OOM?如果是的话有没有办法可以解决(使用rocksdb state?)

FYI:
   数据类型是JSONObject(fast-json),单条大小约20Byte,累计发送数据15,000,000条
   事件时间为真实系统时间,由于发送线程未设置间隔,因此理论上所有数据的时间应当很接近(为了模拟高eps的场景)
   Flink版本是1.9.1
   集群由1个JobManager和2个TaskManager组成,单个TaskManager的heap大约16GB
   并发度为5
   使用Flink SQL MATCH_RECOGNIZE关键字来实现相同case时,没有出现内存溢出的问题,且反压相对正常(0.4-0.5)

代码如下

```java
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        /**
         * {"event_name": "事件A", "src_address": "1.1.1.1", "occur_time":
1574934481585, "payload": "..."}
         * {"event_name": "事件B", "src_address": "2.2.2.2", "occur_time":
1574934481586, "payload": "..."}
         */
        DataStream<JSONObject> stream = env.addSource(new
IncreasingFastJsonSource(15_000_000, 10))
                .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(3000))
{
                    @Override
                    public long extractTimestamp(JSONObject element) {
                        return element.getLongValue("occur_time");
                    }
                });

        stream = stream.map(element -> element.fluentRemove("payload"))
                .keyBy(element -> element.getIntValue("key"));

        AfterMatchSkipStrategy skipStrategy =
AfterMatchSkipStrategy.skipPastLastEvent();
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>
                begin("A", skipStrategy).where(new
SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws
Exception {
                        return
StringUtils.equals(value.getString("event_name"), "事件A");
                    }
                })
                .followedBy("B").where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws
Exception {
                        return
StringUtils.equals(value.getString("event_name"), "事件B");
                    }
                })
                .within(Time.seconds(10));

        CEP.pattern(stream, pattern)
                .select(FollowedBy::select)
                .print();

        env.execute("Benchmark");
```