回复:flink on kubernetes 作业卡主现象咨询

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

回复:flink on kubernetes 作业卡主现象咨询

蒋佳成(Jiacheng Jiang)
Map反压了,要看下游什么操作卡住了,比如写es很慢




------------------ 原始邮件 ------------------
发件人: "a511955993"<[hidden email]&gt;;
发送时间: 2020年5月7日(星期四) 晚上9:51
收件人: "user-zh"<[hidden email]&gt;;
主题: flink on kubernetes 作业卡主现象咨询



hi,&nbsp;all


集群信息:
flink版本是1.10,部署在kubernetes上,kubernetes版本为1.17.4,docker版本为19.03,&nbsp;cni使用的是weave。


现象:
作业运行的时候,偶发会出现operation卡住,下游收不到数据,水位线无法更新,反压上游,作业在一段时间会被kill掉的情况。


通过jstack出来的堆栈信息片段如下:


&quot;Map&nbsp;(152/200)&quot;&nbsp;#155&nbsp;prio=5&nbsp;os_prio=0&nbsp;tid=0x00007f67a4076800&nbsp;nid=0x31f&nbsp;waiting&nbsp;on&nbsp;condition&nbsp;[0x00007f66b04ed000]
&nbsp;&nbsp;&nbsp;java.lang.Thread.State:&nbsp;WAITING&nbsp;(parking)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;sun.misc.Unsafe.park(Native&nbsp;Method)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;parking&nbsp;to&nbsp;wait&nbsp;for&nbsp;&nbsp;<0x0000000608f3c600&gt;&nbsp;(a&nbsp;java.util.concurrent.CompletableFuture$Signaller)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)




有怀疑过是虚拟化网络问题,增加了如下参数,不见效:
taskmanager.network.request-backoff.max:&nbsp;300000
akka.ask.timeout:&nbsp;120s
akka.watch.heartbeat.interval:&nbsp;10s


尝试过调整buffer数量,不见效:
taskmanager.network.memory.floating-buffers-per-gate:&nbsp;16
taskmanager.network.memory.buffers-per-channel:&nbsp;6




目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,非常感谢。

Looking&nbsp;forward&nbsp;to&nbsp;your&nbsp;reply&nbsp;and&nbsp;help.

Best
Reply | Threaded
Open this post in threaded view
|

回复:flink on kubernetes 作业卡主现象咨询

Chris Guo

这个map其实是union操作,然后下游是keyby agg ,数据发送到kafka,kafka层面没有问题。




| |
a511955993
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年05月08日 10:06,蒋佳成(Jiacheng Jiang) 写道:
Map反压了,要看下游什么操作卡住了,比如写es很慢




------------------&nbsp;原始邮件&nbsp;------------------
发件人: "a511955993"<[hidden email]&gt;;
发送时间: 2020年5月7日(星期四) 晚上9:51
收件人: "user-zh"<[hidden email]&gt;;
主题: flink on kubernetes 作业卡主现象咨询



hi,&nbsp;all


集群信息:
flink版本是1.10,部署在kubernetes上,kubernetes版本为1.17.4,docker版本为19.03,&nbsp;cni使用的是weave。


现象:
作业运行的时候,偶发会出现operation卡住,下游收不到数据,水位线无法更新,反压上游,作业在一段时间会被kill掉的情况。


通过jstack出来的堆栈信息片段如下:


&quot;Map&nbsp;(152/200)&quot;&nbsp;#155&nbsp;prio=5&nbsp;os_prio=0&nbsp;tid=0x00007f67a4076800&nbsp;nid=0x31f&nbsp;waiting&nbsp;on&nbsp;condition&nbsp;[0x00007f66b04ed000]
&nbsp;&nbsp;&nbsp;java.lang.Thread.State:&nbsp;WAITING&nbsp;(parking)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;sun.misc.Unsafe.park(Native&nbsp;Method)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;parking&nbsp;to&nbsp;wait&nbsp;for&nbsp;&nbsp;<0x0000000608f3c600&gt;&nbsp;(a&nbsp;java.util.concurrent.CompletableFuture$Signaller)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)




有怀疑过是虚拟化网络问题,增加了如下参数,不见效:
taskmanager.network.request-backoff.max:&nbsp;300000
akka.ask.timeout:&nbsp;120s
akka.watch.heartbeat.interval:&nbsp;10s


尝试过调整buffer数量,不见效:
taskmanager.network.memory.floating-buffers-per-gate:&nbsp;16
taskmanager.network.memory.buffers-per-channel:&nbsp;6




目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,非常感谢。

Looking&nbsp;forward&nbsp;to&nbsp;your&nbsp;reply&nbsp;and&nbsp;help.

Best