业务有个场景:数据量超大,现有的机器资源无法及时处理,需要抽样,但其中有部分数据是必须要处理的,所以数据源分两类:
1)满足维表白名单条件,则直接传到下游; 2)需要采样后传到下游; 过滤后的数据进行业务ETL清洗生成基础数据,基础数据中的白名单数据直接落盘,且基础数据经窗口统计后落盘。 即整个流程如下, 1. sample = source .connect(ruleMap1) //关联维表,若满足白名单则直接发送;反之进行采样后再发送 .process(new BroadcastProcessFunction<>(){ private int j = 0, K = 10000; private Row[] array = new Row[K]; //待发送到下游的抽样数据 private AtomicInteger counter = new AtomicInteger(0); private long start = System.currentTimeMillis(); List<String> omgidList = new ArrayList<>(); @Override public void processBroadcastElement(List<String> currentList, Context ctx, Collector<Row> out) throws Exception { omgidList = currentList; } @Override public void processElement(String input, ReadOnlyContext ctx, Collector<Row> out) throws Exception { Row row = new Row(2); row.set(0,input); if ( ....) { // 1)白名单逻辑:若该满足条件,则做green标记,直接发送至下游 row.setField(1, "green"); out.collect(row); } if (...) { // 2)抽样逻辑:随机抽取记录放入到array中 row.setField(1, "yellow"); array[i] = row; return; } long diff = System.currentTimeMillis() - start; if (diff / 1000 >= 60) { // 3)若达到了抽样窗口结束时间(1min),则发送抽样结果数组,并重置计数器、计时器与数组 for (int k = 0; k < array.length; k++) { out.collect(array[k]); } counter.set(0); start = System.currentTimeMillis(); array = new Row[K]; } } } 2. tmp = sample.connect(ruleMap2).process(ETL); 3. basic = tmp.filter(row -> row.getFiled(1) == "green").map(); //basic.addSink() 4. aggr = tmp.flatmap().keyBy(random).window(5min).aggregate(sum).map(); //aggr.addSink() 程序跑起来不久就会导致报错,log写的是IO缓冲池爆了,所以flatmap无法发送元素。这里1分钟抽样1W条,5分钟也就只有5W,物理节点3G内存,无checkPoint,找了许久找不到原因,是不是采样处理方式不对?烦请大牛们指点下~ Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:600) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:558) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:538) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:704) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.real.function.transform.StatInfoFlatMapFunction.flatMap(StatInfoFlatMapFunction.java:52) at com.real.function.transform.StatInfoFlatMapFunction.flatMap(StatInfoFlatMapFunction.java:21) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:583) ... 21 more Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:245) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:168) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:132) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) ... 36 more |
Free forum by Nabble | Edit this page |