flink checkpoint kafka CommitFailedException

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink checkpoint kafka CommitFailedException

¥¥¥
flink1.4.2 kafka 0.10.2  每次checkppint都会有该warn
kafka使用:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaServer);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"user-compute-test1");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"60000");
properties.setProperty("max.poll.interval.ms", "300000");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010<>(kafkatopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer010)