Flink Kafka Connector相关问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka Connector相关问题

戴鑫铉
您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:

1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?

2、还想问下flink kafka
connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
End Offset不是一回事啊?能请详细解释一下吗?
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka Connector相关问题

Wong Victor
Hi 鑫铉:
  我尝试解答下;
 
  1. 这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
  根据官方文档  [1],checkpoint offset是Flink的功能,auto commit offset是kafka client的功能;这俩功能在作用上有一定重叠,所以根据配置会有不同的行为表现;
  如果Flink没有开启checkpoint,那么offset的保存依赖于kafka client的auto commit offset;
  如果Flink开启了checkpoint,那么auto commit offset会被Flink禁用;在flink完成checkpoint之后,根据用户配置决定是否提交offset到zookeeper (kafka 0.8) 或 kafka broker ( kafka 0.8+);
  结论:如果不开启checkpoint,只要kafka properties中配置了 " enable.auto.commit"为true 和 " auto.commit.interval.ms"大于0,就能定期提交offset到kafka;

  2. current-offsets、committed-offsets、consumer lag;
  根据官方文档 [2],
  current-offsets是当前Flink读取到的最新offset;
  committed-offsets是提交到zookeeper/kafka broker 的offset;
  consumer lag是指topic最新的offset(log end offset) 和 committed-offsets的差值;Flink没有提供consumer lag信息,该信息依赖于kafka及其相关运维工具生成;

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-connector-metrics


On 2019/8/22, 7:21 PM, "戴鑫铉" <[hidden email]> wrote:

    您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:
   
    1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
   
    2、还想问下flink kafka
    connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
    End Offset不是一回事啊?能请详细解释一下吗?
   

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka Connector相关问题

戴鑫铉
Hi Victor:
   您的回复已收到,谢谢您详细的解答!非常感谢!

Victor Wong <[hidden email]> 于2019年8月23日周五 上午10:20写道:

> Hi 鑫铉:
>   我尝试解答下;
>
>   1. 这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
>   根据官方文档  [1],checkpoint offset是Flink的功能,auto commit offset是kafka
> client的功能;这俩功能在作用上有一定重叠,所以根据配置会有不同的行为表现;
>   如果Flink没有开启checkpoint,那么offset的保存依赖于kafka client的auto commit offset;
>   如果Flink开启了checkpoint,那么auto commit
> offset会被Flink禁用;在flink完成checkpoint之后,根据用户配置决定是否提交offset到zookeeper (kafka
> 0.8) 或 kafka broker ( kafka 0.8+);
>   结论:如果不开启checkpoint,只要kafka properties中配置了 " enable.auto.commit"为true 和 "
> auto.commit.interval.ms"大于0,就能定期提交offset到kafka;
>
>   2. current-offsets、committed-offsets、consumer lag;
>   根据官方文档 [2],
>   current-offsets是当前Flink读取到的最新offset;
>   committed-offsets是提交到zookeeper/kafka broker 的offset;
>   consumer lag是指topic最新的offset(log end offset) 和
> committed-offsets的差值;Flink没有提供consumer lag信息,该信息依赖于kafka及其相关运维工具生成;
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-connector-metrics
>
>
> On 2019/8/22, 7:21 PM, "戴鑫铉" <[hidden email]> wrote:
>
>     您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:
>
>
> 1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
>
>     2、还想问下flink kafka
>
> connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
>     End Offset不是一回事啊?能请详细解释一下吗?
>
>
>