|
Dear all:
flink版本是1.12.4,kafka版本是1.1.1。作业topology很简单,source-->flatmap--->sink ,在开启checkpoint,作业运行几个小时后会报错。报错内容如下
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: The broker received an out of order sequence number.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[LogAnalyseFlink-1.0.jar:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850) ~[LogAnalyseFlink-1.0.jar:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
product没有特殊的配置, 默认使用的是AT_LEAST_ONCE semantic
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "2000");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "524288");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
如果报错了,就得把作业停掉,等事务15分钟后过期启动程序才可以恢复。 目前没有排查问题的思路,需要帮助。
|