一个任务从Kafka读数据做统计,
将统计结果写回kafka, 偶尔会报 FlinkKafka011Exception 导致Job停止, 请问大家一般怎么处理的,是catch掉,日志输出吗? 生产者构造代码 FlinkKafkaProducer011 producer = new FlinkKafkaProducer011(kafkaOutputTopic, new KafkaSerializationSchema(KAFKA_OUTPUT_TYPE), producerConfig, Optional.of(new KafkaPartitionerByKey<>()), FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize); kafka配置如下: bootstrap.servers= enable.auto.commit=true max.poll.records=1000 偶尔发生下面异常: 2019-07-20 21:41:40,576 INFO org.apache.flink.runtime.taskmanager.Task - Window(SlidingEventTimeWindows(120000, 60000), EventTimeTrigger, InterfaceStatisticsAggregate, InterfaceStatisticsWindow) -> (Map, Sink: Unnamed, Sink: Unnamed) (4/6) (1a9a0b3a306ca983b2fc4227aa7a10fe) switched from RUNNING to FAILED. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.lambda$apply$0(InterfaceStatisticsWindow.java:35) //这里代码是:WindowFunction 的 apply 方法,在向 Collector<InterfaceStatisticsOutDto> out 里输出内容: out.collect(item) at java.util.Collections$SingletonList.forEach(Collections.java:4822) at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:31) at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:16) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: The server disconnected before a response was received. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) ... 25 more 想请问大家一都如何处理这种情况: 是我哪里配置有问题吗? 我现在处理方式是在输出的地方catch这个Exception, 不知道大家怎么处理: /** * @author butnet */ public class InterfaceStatisticsWindow implements WindowFunction<InterfaceStatisticsOutDto, InterfaceStatisticsOutDto, String, TimeWindow> { private static final Logger log = LoggerFactory.getLogger(InterfaceStatisticsWindow.class); private static final long serialVersionUID = 1L; @Override public void apply(String key, TimeWindow window, Iterable<InterfaceStatisticsOutDto> input, Collector<InterfaceStatisticsOutDto> out) throws Exception { if (log.isDebugEnabled()) { log.debug("InterfaceStatisticsWindow apply:" + key + " start:" + window.getStart() + " " + window.getEnd()); } input.forEach((Consumer<? super InterfaceStatisticsOutDto>) ir -> { ir.setWindowsStartTime(window.getStart()); ir.setWindowsEndTime(window.getEnd()); ir.setInterfaceName(key); try { out.collect(ir); } catch (Exception ex) { log.info("输出异常: " + ex.toString(), ex); } if (log.isDebugEnabled()) { log.debug("InterfaceStatisticsWindow apply forEach:" + ir.getInterfaceName()); } }); } } 谢谢 [hidden email] |
FlinkKafka011Exception: Failed to send data to Kafka: The server
disconnected before a response was received. 看下这个异常,好像是 sink 数据到 kafka 的时候与 kafka 服务器就断开连接了 [hidden email] <[hidden email]> 于 2019年7月20日周六 下午10:37写道: > 一个任务从Kafka读数据做统计, > 将统计结果写回kafka, > 偶尔会报 FlinkKafka011Exception > 导致Job停止, > 请问大家一般怎么处理的,是catch掉,日志输出吗? > > 生产者构造代码 > FlinkKafkaProducer011 producer = new > FlinkKafkaProducer011(kafkaOutputTopic, > new KafkaSerializationSchema(KAFKA_OUTPUT_TYPE), > producerConfig, > Optional.of(new KafkaPartitionerByKey<>()), > FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, > kafkaProducersPoolSize); > kafka配置如下: > bootstrap.servers= > enable.auto.commit=true > max.poll.records=1000 > > 偶尔发生下面异常: > > 2019-07-20 21:41:40,576 INFO org.apache.flink.runtime.taskmanager.Task > - Window(SlidingEventTimeWindows(120000, 60000), > EventTimeTrigger, InterfaceStatisticsAggregate, InterfaceStatisticsWindow) > -> (Map, Sink: Unnamed, Sink: Unnamed) (4/6) > (1a9a0b3a306ca983b2fc4227aa7a10fe) switched from RUNNING to FAILED. > java.lang.RuntimeException: Exception occurred while processing valve > output watermark: > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.xxxxxx.aiops.window.InterfaceStatisticsWindow.lambda$apply$0(InterfaceStatisticsWindow.java:35) > //这里代码是:WindowFunction 的 apply 方法,在向 Collector<InterfaceStatisticsOutDto> > out 里输出内容: out.collect(item) > at > java.util.Collections$SingletonList.forEach(Collections.java:4822) > at > com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:31) > at > com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:16) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > ... 7 more > Caused by: > org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed > to send data to Kafka: The server disconnected before a response was > received. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > ... 25 more > > 想请问大家一都如何处理这种情况: > 是我哪里配置有问题吗? > 我现在处理方式是在输出的地方catch这个Exception, 不知道大家怎么处理: > /** > * @author butnet > */ > public class InterfaceStatisticsWindow implements > WindowFunction<InterfaceStatisticsOutDto, InterfaceStatisticsOutDto, > String, TimeWindow> { > private static final Logger log = > LoggerFactory.getLogger(InterfaceStatisticsWindow.class); > private static final long serialVersionUID = 1L; > > @Override > public void apply(String key, TimeWindow window, > Iterable<InterfaceStatisticsOutDto> input, > Collector<InterfaceStatisticsOutDto> out) throws > Exception { > if (log.isDebugEnabled()) { > log.debug("InterfaceStatisticsWindow apply:" + key + " start:" > + window.getStart() + " " + window.getEnd()); > } > input.forEach((Consumer<? super InterfaceStatisticsOutDto>) ir -> { > ir.setWindowsStartTime(window.getStart()); > ir.setWindowsEndTime(window.getEnd()); > ir.setInterfaceName(key); > try { > out.collect(ir); > } catch (Exception ex) { > log.info("输出异常: " + ex.toString(), ex); > } > if (log.isDebugEnabled()) { > log.debug("InterfaceStatisticsWindow apply forEach:" + > ir.getInterfaceName()); > } > }); > } > } > > > 谢谢 > [hidden email] > |
Free forum by Nabble | Edit this page |