Flink任务中写Kafka超时导致任务超时

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink任务中写Kafka超时导致任务超时

Frost Wong
各位好,

我的Flink任务中有写Kafka的操作,最近发现当写的这个Kafka整体QPS很高时,响应时间也会很长。我的核心逻辑是继承了RichFlatMapFunction,在flatMap方法中调用KafkaProducer.produce(),在方法最后调用KafkaProducer.flush()。

我知道调用KafkaProducer.flush()可能会很耗时,但我的业务不允许数据丢失,我考虑把KafkaProducer.flush()放在RichFlatMapFunction的close()方法中。不知道这个close()方法是什么时候调用的。

我的问题是:


  1.  主动cancel任务的时候会调用close()方法吗?如果这时KafkaProducer.flush()卡住,任务就cancel不掉了,最终数据会丢失
  2.  由于其他异常(如网络超时)等导致任务重启时会调用close()方法吗?

谢谢大家!