各位好:
最近遇到一个问题,上游有反压的情况下任务运行一段时间后出现上下游数据均停滞的情况,通过jstack命令发现,source算子阻塞了,同时观察到下游也在等待数据。堆栈如下: "Legacy Source Thread - Source: Custom Source (1/2)" #95 prio=5 os_prio=0 tid=0x00007fafa4018000 nid=0x57d waiting on condition [0x00007fb03d48a000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000074afaf508> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:241) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:210) at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:151) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:122) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) - locked <0x000000074a5f6a98> (a java.lang.Object) at com.jd.bdp.flink.sink.jimdb.common.SourceTimeMillisMock.run(SourceTimeMillisMock.java:25) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200) 初步认为是卡死在获取Memory Segment,但是在flink ui上观察到Memory Segments Available充足,想咨询下各位这种情况如何分析解决,麻烦各位了。 |
首先memory segment不会因为某一个channel反压而全部分配给它导致消耗殆尽,可以百度了解一下反压的流程
一般面对反压主要有两种处理方法: 1.增大并发 2.优化keyby中的key,尽量打散key,比如采用两层keyby方式,第一层keyby加入随机数打散,第二层汇总 | | 熊云昆 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2020年10月27日 18:50,1548069580 写道: 各位好: 最近遇到一个问题,上游有反压的情况下任务运行一段时间后出现上下游数据均停滞的情况,通过jstack命令发现,source算子阻塞了,同时观察到下游也在等待数据。堆栈如下: "Legacy Source Thread - Source: Custom Source (1/2)" #95 prio=5 os_prio=0 tid=0x00007fafa4018000 nid=0x57d waiting on condition [0x00007fb03d48a000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000074afaf508> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:241) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:210) at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:151) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:122) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) - locked <0x000000074a5f6a98> (a java.lang.Object) at com.jd.bdp.flink.sink.jimdb.common.SourceTimeMillisMock.run(SourceTimeMillisMock.java:25) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200) 初步认为是卡死在获取Memory Segment,但是在flink ui上观察到Memory Segments Available充足,想咨询下各位这种情况如何分析解决,麻烦各位了。 |
Free forum by Nabble | Edit this page |