为啥KafkaFetcher要新开线程KafkaConsumerThread去消费获取数据,而不是使用LegacySourceFunctionThread去做loop循环呢

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

为啥KafkaFetcher要新开线程KafkaConsumerThread去消费获取数据,而不是使用LegacySourceFunctionThread去做loop循环呢

yuehan1
Hi,从kafka获取数据的while循环中,为啥不直接使用当前线程(LegacySourceFunctionThread),而是新创建了consumerThread每次取一条数据然后在交给当前线程

源码版本:1.10.0
// KafkaFetcher.java
@Override
public void runFetchLoop() throws Exception {
       try {
              final Handover handover = this.handover;

              // kick off the actual Kafka consumer
              consumerThread.start();

              while (running) {
                     // this blocks until we get the next records
                     // it automatically re-throws exceptions encountered in the consumer thread
                     final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

                     // get the records for each topic partition
                     for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {

                     }
              }
       }
       finally {
              // this signals the consumer thread that no more work is to be done
              consumerThread.shutdown();
       }

       // on a clean exit, wait for the runner thread
       try {
              consumerThread.join();
       }

谢谢回复