大家晚上好,我们这里用flink消费kafka,往mysql写。topic有9个分区,flink任务并行度是3。任务却一直报这个错:2019-08-21 19:33:07,058 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257) 我已经尝试将session.timeout.ms设置成60s,但结果还是一样的。flink任务的逻辑是kafka->map->filter->split,最后select出来两个流,插到两个不同的mysql表。这里是因为并行度是3的原因,所以导致了触发了consumer的rebalance吗? |
你这个用的是什么版本,我看社区版现在都是支持 kafka assign 接口,走 subscribe 接口的话会走 kafka group 管理,这个出现下面问题的几率会比较大,比如:consumer 超过一定时间没有调用 poll 接口就会触发。
--- Best, Matt Wang Original Message Sender:[hidden email] Recipient:[hidden email] Date:Wednesday, Aug 21, 2019 20:26 Subject:flink频繁报向kafka提交offset失败的错误 大家晚上好,我们这里用flink消费kafka,往mysql写。topic有9个分区,flink任务并行度是3。任务却一直报这个错:2019-08-21 19:33:07,058 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257) 我已经尝试将session.timeout.ms设置成60s,但结果还是一样的。flink任务的逻辑是kafka-map-filter-split,最后select出来两个流,插到两个不同的mysql表。这里是因为并行度是3的原因,所以导致了触发了consumer的rebalance吗? |
Free forum by Nabble | Edit this page |