开启checkpoint,kafka事务OutOfOrderSequenceException

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

开启checkpoint,kafka事务OutOfOrderSequenceException

Chris Guo
Dear all:
          flink版本是1.12.4,kafka版本是1.1.1。作业topology很简单,source-->flatmap--->sink ,在开启checkpoint,作业运行几个小时后会报错。报错内容如下


Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: The broker received an out of order sequence number.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[LogAnalyseFlink-1.0.jar:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850) ~[LogAnalyseFlink-1.0.jar:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)




product没有特殊的配置, 默认使用的是AT_LEAST_ONCE semantic
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "2000");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "524288");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
        properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");


如果报错了,就得把作业停掉,等事务15分钟后过期启动程序才可以恢复。 目前没有排查问题的思路,需要帮助。