线上部署flink项目时,启动爆如下错误,在测试环境可正常启动,job依赖的flink版本是1.10,flink
集群版本是1.12-SNAPSHOT,与线上一致。有点摸不着头脑,麻烦各位老师帮分析分析 堆栈信息: java.lang.IllegalArgumentException: key group from 44 to 45 does not contain 4 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:187) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:182) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:176) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:112) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:217) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:884) at org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:42) at org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:898) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:399) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:567) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) 代码逻辑大致: DataStream stream = dataStream .keyBy(keyBy(globalParallelism)) .window(window(downsampling)) .reduce(reduce(trackerType), processWindow(trackerType), TypeInformation.of(Metrics.class)) .keyBy(secondKeyBy()) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .reduce(reduce(trackerType), processSecondWindow(trackerType), TypeInformation.of(Metrics.class)) .rebalance() .addSink(sink()) .setParallelism(globalParallelism/2); public KeySelector<Metrics, String> keyBy(int parallelism) { return value -> Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),ThreadLocalRandom.current().nextInt(parallelism)); } public KeySelector<Metrics, String> secondKeyBy() { return value -> Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(), value.getWindowEnd()); } 备注:二次keyby的原因是为了解决数据倾斜问题,第一个keyby用来基于EventTime的翻滚窗口,第二个keyby使用了基于processTime的session窗口 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
原因是你的key selector引入了随机变量 (也就是下面的方法keyBy),导致其select出来的key不是固定的 public KeySelector<Metrics, String> keyBy(int parallelism) { return value -> Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(), ThreadLocalRandom.current().nextInt(parallelism)); } 例如原先的key selector选出的key是 key-A,经过取模得到的key group是44,理应将该record发送给下游key group包含44的task,但是相关record进入到对应group的task之后,在加入到timer队列的时候,还会再次进行group的计算,由于你的key selector有随机性,导致这次选出的key可能是key-B,而针对key-B的取模运算得到的key group是4,也就有可能不在你的task (key group 44-45) 中了,导致了最终的异常。 祝好 唐云 ________________________________ From: restart <[hidden email]> Sent: Thursday, January 28, 2021 17:54 To: [hidden email] <[hidden email]> Subject: key group from xx to yy does not contain zz异常 线上部署flink项目时,启动爆如下错误,在测试环境可正常启动,job依赖的flink版本是1.10,flink 集群版本是1.12-SNAPSHOT,与线上一致。有点摸不着头脑,麻烦各位老师帮分析分析 堆栈信息: java.lang.IllegalArgumentException: key group from 44 to 45 does not contain 4 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:187) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:182) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:176) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:112) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:217) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:884) at org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:42) at org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:898) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:399) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:567) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) 代码逻辑大致: DataStream stream = dataStream .keyBy(keyBy(globalParallelism)) .window(window(downsampling)) .reduce(reduce(trackerType), processWindow(trackerType), TypeInformation.of(Metrics.class)) .keyBy(secondKeyBy()) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .reduce(reduce(trackerType), processSecondWindow(trackerType), TypeInformation.of(Metrics.class)) .rebalance() .addSink(sink()) .setParallelism(globalParallelism/2); public KeySelector<Metrics, String> keyBy(int parallelism) { return value -> Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),ThreadLocalRandom.current().nextInt(parallelism)); } public KeySelector<Metrics, String> secondKeyBy() { return value -> Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(), value.getWindowEnd()); } 备注:二次keyby的原因是为了解决数据倾斜问题,第一个keyby用来基于EventTime的翻滚窗口,第二个keyby使用了基于processTime的session窗口 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
感谢老师解答,keyBy的执行逻辑看来我理解的太肤浅了。随机数生成逻辑在keyBy前通过map赋值到具体字段,保证后续keyby时稳定,应该就对了。再次感谢老师指点迷津。
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |