|
各位好,
我的Flink任务中有写Kafka的操作,最近发现当写的这个Kafka整体QPS很高时,响应时间也会很长。我的核心逻辑是继承了RichFlatMapFunction,在flatMap方法中调用KafkaProducer.produce(),在方法最后调用KafkaProducer.flush()。
我知道调用KafkaProducer.flush()可能会很耗时,但我的业务不允许数据丢失,我考虑把KafkaProducer.flush()放在RichFlatMapFunction的close()方法中。不知道这个close()方法是什么时候调用的。
我的问题是:
1. 主动cancel任务的时候会调用close()方法吗?如果这时KafkaProducer.flush()卡住,任务就cancel不掉了,最终数据会丢失
2. 由于其他异常(如网络超时)等导致任务重启时会调用close()方法吗?
谢谢大家!
|