向kafka写数据,偶尔会报 FlinkKafka011Exception 导致Job停止问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

向kafka写数据,偶尔会报 FlinkKafka011Exception 导致Job停止问题

butnet@163.com
一个任务从Kafka读数据做统计,
将统计结果写回kafka,
偶尔会报 FlinkKafka011Exception
导致Job停止,
请问大家一般怎么处理的,是catch掉,日志输出吗?

生产者构造代码
FlinkKafkaProducer011 producer = new FlinkKafkaProducer011(kafkaOutputTopic,
        new KafkaSerializationSchema(KAFKA_OUTPUT_TYPE),
        producerConfig,
        Optional.of(new KafkaPartitionerByKey<>()),
        FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
        kafkaProducersPoolSize);
kafka配置如下:
bootstrap.servers=
enable.auto.commit=true
max.poll.records=1000

偶尔发生下面异常:

2019-07-20 21:41:40,576 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(SlidingEventTimeWindows(120000, 60000), EventTimeTrigger, InterfaceStatisticsAggregate, InterfaceStatisticsWindow) -> (Map, Sink: Unnamed, Sink: Unnamed) (4/6) (1a9a0b3a306ca983b2fc4227aa7a10fe) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.lambda$apply$0(InterfaceStatisticsWindow.java:35)   //这里代码是:WindowFunction 的 apply 方法,在向 Collector<InterfaceStatisticsOutDto> out 里输出内容: out.collect(item)
        at java.util.Collections$SingletonList.forEach(Collections.java:4822)
        at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:31)
        at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:16)
        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
        at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
        ... 7 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: The server disconnected before a response was received.
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94)
        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        ... 25 more

想请问大家一都如何处理这种情况:
是我哪里配置有问题吗?
我现在处理方式是在输出的地方catch这个Exception, 不知道大家怎么处理:
/**
 * @author butnet
 */
public class InterfaceStatisticsWindow implements WindowFunction<InterfaceStatisticsOutDto, InterfaceStatisticsOutDto, String, TimeWindow> {
    private static final Logger log = LoggerFactory.getLogger(InterfaceStatisticsWindow.class);
    private static final long serialVersionUID = 1L;

    @Override
    public void apply(String key, TimeWindow window, Iterable<InterfaceStatisticsOutDto> input,
                      Collector<InterfaceStatisticsOutDto> out) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("InterfaceStatisticsWindow apply:" + key + " start:" + window.getStart() + " " + window.getEnd());
        }
        input.forEach((Consumer<? super InterfaceStatisticsOutDto>) ir -> {
            ir.setWindowsStartTime(window.getStart());
            ir.setWindowsEndTime(window.getEnd());
            ir.setInterfaceName(key);
            try {
                out.collect(ir);
            } catch (Exception ex) {
                log.info("输出异常: " + ex.toString(), ex);
            }
            if (log.isDebugEnabled()) {
                log.debug("InterfaceStatisticsWindow apply forEach:" + ir.getInterfaceName());
            }
        });
    }
}


谢谢
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 向kafka写数据,偶尔会报 FlinkKafka011Exception 导致Job停止问题

zhisheng
FlinkKafka011Exception: Failed to send data to Kafka: The server
disconnected before a response was received.

看下这个异常,好像是 sink 数据到 kafka 的时候与 kafka 服务器就断开连接了

[hidden email] <[hidden email]> 于 2019年7月20日周六 下午10:37写道:

> 一个任务从Kafka读数据做统计,
> 将统计结果写回kafka,
> 偶尔会报 FlinkKafka011Exception
> 导致Job停止,
> 请问大家一般怎么处理的,是catch掉,日志输出吗?
>
> 生产者构造代码
> FlinkKafkaProducer011 producer = new
> FlinkKafkaProducer011(kafkaOutputTopic,
>         new KafkaSerializationSchema(KAFKA_OUTPUT_TYPE),
>         producerConfig,
>         Optional.of(new KafkaPartitionerByKey<>()),
>         FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
>         kafkaProducersPoolSize);
> kafka配置如下:
> bootstrap.servers=
> enable.auto.commit=true
> max.poll.records=1000
>
> 偶尔发生下面异常:
>
> 2019-07-20 21:41:40,576 INFO  org.apache.flink.runtime.taskmanager.Task
>                  - Window(SlidingEventTimeWindows(120000, 60000),
> EventTimeTrigger, InterfaceStatisticsAggregate, InterfaceStatisticsWindow)
> -> (Map, Sink: Unnamed, Sink: Unnamed) (4/6)
> (1a9a0b3a306ca983b2fc4227aa7a10fe) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>         at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>         at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>         at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
> com.xxxxxx.aiops.window.InterfaceStatisticsWindow.lambda$apply$0(InterfaceStatisticsWindow.java:35)
>  //这里代码是:WindowFunction 的 apply 方法,在向 Collector<InterfaceStatisticsOutDto>
> out 里输出内容: out.collect(item)
>         at
> java.util.Collections$SingletonList.forEach(Collections.java:4822)
>         at
> com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:31)
>         at
> com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:16)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
>         at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
>         at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>         ... 7 more
> Caused by:
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed
> to send data to Kafka: The server disconnected before a response was
> received.
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94)
>         at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
>         at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>         ... 25 more
>
> 想请问大家一都如何处理这种情况:
> 是我哪里配置有问题吗?
> 我现在处理方式是在输出的地方catch这个Exception, 不知道大家怎么处理:
> /**
>  * @author butnet
>  */
> public class InterfaceStatisticsWindow implements
> WindowFunction<InterfaceStatisticsOutDto, InterfaceStatisticsOutDto,
> String, TimeWindow> {
>     private static final Logger log =
> LoggerFactory.getLogger(InterfaceStatisticsWindow.class);
>     private static final long serialVersionUID = 1L;
>
>     @Override
>     public void apply(String key, TimeWindow window,
> Iterable<InterfaceStatisticsOutDto> input,
>                       Collector<InterfaceStatisticsOutDto> out) throws
> Exception {
>         if (log.isDebugEnabled()) {
>             log.debug("InterfaceStatisticsWindow apply:" + key + " start:"
> + window.getStart() + " " + window.getEnd());
>         }
>         input.forEach((Consumer<? super InterfaceStatisticsOutDto>) ir -> {
>             ir.setWindowsStartTime(window.getStart());
>             ir.setWindowsEndTime(window.getEnd());
>             ir.setInterfaceName(key);
>             try {
>                 out.collect(ir);
>             } catch (Exception ex) {
>                 log.info("输出异常: " + ex.toString(), ex);
>             }
>             if (log.isDebugEnabled()) {
>                 log.debug("InterfaceStatisticsWindow apply forEach:" +
> ir.getInterfaceName());
>             }
>         });
>     }
> }
>
>
> 谢谢
> [hidden email]
>