Flink 关于缓冲池被打爆的case求助

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink 关于缓冲池被打爆的case求助

梁溪
业务有个场景:数据量超大,现有的机器资源无法及时处理,需要抽样,但其中有部分数据是必须要处理的,所以数据源分两类:
1)满足维表白名单条件,则直接传到下游;
2)需要采样后传到下游;
过滤后的数据进行业务ETL清洗生成基础数据,基础数据中的白名单数据直接落盘,且基础数据经窗口统计后落盘。
即整个流程如下,
1. sample = source
    .connect(ruleMap1) //关联维表,若满足白名单则直接发送;反之进行采样后再发送
    .process(new BroadcastProcessFunction<>(){
        private int j = 0, K = 10000;
        private Row[] array = new Row[K]; //待发送到下游的抽样数据
        private AtomicInteger counter = new AtomicInteger(0);
        private long start = System.currentTimeMillis();
    List<String> omgidList = new ArrayList<>();
    @Override
    public void processBroadcastElement(List<String> currentList, Context ctx, Collector<Row> out) throws Exception {
    omgidList = currentList;
    }
    @Override
        public void processElement(String input, ReadOnlyContext ctx, Collector<Row> out) throws Exception {
        Row row = new Row(2);
        row.set(0,input);
            if ( ....) {  // 1)白名单逻辑:若该满足条件,则做green标记,直接发送至下游
                row.setField(1, "green");
                out.collect(row);
            }
            if (...) {  // 2)抽样逻辑:随机抽取记录放入到array中
                row.setField(1, "yellow");
                array[i] = row;
                return;
            }
            long diff = System.currentTimeMillis() - start;
            if (diff / 1000 >=  60) {   // 3)若达到了抽样窗口结束时间(1min),则发送抽样结果数组,并重置计数器、计时器与数组
                for (int k = 0; k < array.length; k++) {
                    out.collect(array[k]);
                }
                counter.set(0);
                start = System.currentTimeMillis();
                array = new Row[K];
            }
        }
    }


2. tmp = sample.connect(ruleMap2).process(ETL);
3. basic = tmp.filter(row -> row.getFiled(1) == "green").map();         //basic.addSink()
4. aggr = tmp.flatmap().keyBy(random).window(5min).aggregate(sum).map(); //aggr.addSink()


程序跑起来不久就会导致报错,log写的是IO缓冲池爆了,所以flatmap无法发送元素。这里1分钟抽样1W条,5分钟也就只有5W,物理节点3G内存,无checkPoint,找了许久找不到原因,是不是采样处理方式不对?烦请大牛们指点下~
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:600)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:538)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:704)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.real.function.transform.StatInfoFlatMapFunction.flatMap(StatInfoFlatMapFunction.java:52)
at com.real.function.transform.StatInfoFlatMapFunction.flatMap(StatInfoFlatMapFunction.java:21)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:583)
... 21 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:245)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:168)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:132)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 36 more