Can flink kafka producer dynamically discover new parititons after expansion?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Can flink kafka producer dynamically discover new parititons after expansion?

ぐ绿み翼う
hi,I have a source Kafka and a sink Kafka,when the amount of data processing grows,I need to expand Kafka topic's  partition number ,but I don't want to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could consume the new parititon after I expand the Kafka topic's partition number.


but sink kafka don't work like this:


The flink kafka producer get topic's paritition list and cache in topicPartitionsMap as showed In Class FlinkKafkaProducer<IN&gt;&nbsp;:


int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
    partitions = getPartitionsByTopic(targetTopic, transaction.producer);
    this.topicPartitionsMap.put(targetTopic, partitions);
}When kafka topic needs to expand,the new parition can not be discovered。For example, when expanding from 1 partition to 2 partitions,partitions we got is never update until job restartAre there plans to improve this feature or Is there any other way to achieve the function?