Hi all,我在使用flink cep的过程中遇到了如下问题(一个 A FollowedBy B 的 case)
Case1. 我构造的数据源中全是事件A,全速发送 该任务在执行数秒后阻塞(task之间不再有数据交换),源端的backpressure=1,一段时间后TM失去心跳响应,任务异常终止,此时TaskManager进程存活,但通过jstat观察内存使用情况后,发现内存被耗尽,且不停的FullGC Case2. 我构造的数据源中交叉发送事件A和事件B,全速发送 该任务能够正常执行,但源端的backpressure=1,且整个任务的处理性能非常低,大约只有不到5000的eps 问题如下: 1. Flink CEP是否存在潜在的性能问题? 2. 为什么会出现OOM,是否使用不当导致的 3. 总数据量的大小应该小于3GB,是否是由于cep operator state累积导致的OOM?如果是的话有没有办法可以解决(使用rocksdb state?) FYI: 数据类型是JSONObject(fast-json),单条大小约20Byte,累计发送数据15,000,000条 事件时间为真实系统时间,由于发送线程未设置间隔,因此理论上所有数据的时间应当很接近(为了模拟高eps的场景) Flink版本是1.9.1 集群由1个JobManager和2个TaskManager组成,单个TaskManager的heap大约16GB 并发度为5 使用Flink SQL MATCH_RECOGNIZE关键字来实现相同case时,没有出现内存溢出的问题,且反压相对正常(0.4-0.5) 代码如下 ```java final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); /** * {"event_name": "事件A", "src_address": "1.1.1.1", "occur_time": 1574934481585, "payload": "..."} * {"event_name": "事件B", "src_address": "2.2.2.2", "occur_time": 1574934481586, "payload": "..."} */ DataStream<JSONObject> stream = env.addSource(new IncreasingFastJsonSource(15_000_000, 10)) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(3000)) { @Override public long extractTimestamp(JSONObject element) { return element.getLongValue("occur_time"); } }); stream = stream.map(element -> element.fluentRemove("payload")) .keyBy(element -> element.getIntValue("key")); AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent(); Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject> begin("A", skipStrategy).where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { return StringUtils.equals(value.getString("event_name"), "事件A"); } }) .followedBy("B").where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { return StringUtils.equals(value.getString("event_name"), "事件B"); } }) .within(Time.seconds(10)); CEP.pattern(stream, pattern) .select(FollowedBy::select) .print(); env.execute("Benchmark"); ``` |
Free forum by Nabble | Edit this page |