|
Hi,大家好:
我在使用 flink kafka sink 时遇到几个问题/疑惑,请教大家。
1. kafka sink 没有像 elasticsearch sink 一样提供一个 ActionRequestFailureHandler,在遇到异常怎么办呢? 而且不确定到底会有哪些异常?
在 FlinkKafkaProducer 的 open中的回调是这样的,onCompletion 只有 RecordMetadata 和 Exception ,不能拿到 Record,而且 callback 是private的,无法通过继承重写
if (logFailuresOnly) {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
}
acknowledgeMessage();
}
};
}
如果 kafkaSink.setLogFailuresOnly(true); 那么只打印异常信息,消息将丢失了,消息丢失是不能容忍的,需要写到错误队列,但是拿不到消息。
如果 kafkaSink.setLogFailuresOnly(false) 作业将异常重启,但是又会重复消费,又遇到相同的消息,又异常重启。
注:目前只遇到 消息太大 的异常,默认最大消息为1M,已经调大,但是这样没有治本。
2. 在给 kafka sink 设置了 exactly-once 语义之后,作业默认并行度设置为2,启动后打印一百多次的 kafka config 连接信息,很是疑惑为什么会有这么多?
FlinkKafkaProducer 中有这两个参数,在生成 事务ID时,一个并行度应该只生成 5 * 5 等于 25个事务ID?
public static final int SAFE_SCALE_DOWN_FACTOR = 5;
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
默认的checkpoint并行度是1,所以 poolsize应该可以调小点,但是对 SAFE_SCALE_DOWN_FACTOR 还不是很清楚功能
感谢您
|