|
hi,I have a source Kafka and a sink Kafka,when the amount of data processing grows,I need to expand Kafka topic's partition number ,but I don't want to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could consume the new parititon after I expand the Kafka topic's partition number.
but sink kafka don't work like this:
The flink kafka producer get topic's paritition list and cache in topicPartitionsMap as showed In Class FlinkKafkaProducer<IN> :
int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}When kafka topic needs to expand,the new parition can not be discovered。For example, when expanding from 1 partition to 2 partitions,partitions we got is never update until job restartAre there plans to improve this feature or Is there any other way to achieve the function?
|