|
Hi,从kafka获取数据的while循环中,为啥不直接使用当前线程(LegacySourceFunctionThread),而是新创建了consumerThread每次取一条数据然后在交给当前线程
源码版本:1.10.0
// KafkaFetcher.java
@Override
public void runFetchLoop() throws Exception {
try {
final Handover handover = this.handover;
// kick off the actual Kafka consumer
consumerThread.start();
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
}
}
}
finally {
// this signals the consumer thread that no more work is to be done
consumerThread.shutdown();
}
// on a clean exit, wait for the runner thread
try {
consumerThread.join();
}
谢谢回复
|