flink消费kafka数据,如何从kafka端获取消费偏移信息

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink消费kafka数据,如何从kafka端获取消费偏移信息

wqpapa
flink通过FlinkKafkaConsumer消费kafka主题,设置group.id按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息?
kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe  --group ,也取不到。请问下要怎么获取啊?
Reply | Threaded
Open this post in threaded view
|

Re: flink消费kafka数据,如何从kafka端获取消费偏移信息

Benchao Li
setStartFromGroupOffsets
说的是从kafka里面保存的offset开始消费,flink并没有保证kafka一定会有这个consumer的offset,如果没有的话,会按照kafka
consumer的配置'auto.offset.reset'来设置这些partition的offset。

具体情况可以参考文档[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

wqpapa <[hidden email]> 于2020年1月12日周日 下午9:09写道:

> flink通过FlinkKafkaConsumer消费kafka主题,设置group.id
> 按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息?
> kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe
> --group ,也取不到。请问下要怎么获取啊?



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink消费kafka数据,如何从kafka端获取消费偏移信息

wqpapa
感谢回复!主要想了解下如何从kafka端后台命令方式获取对应消费组的偏移信息。之前通过普通的java代码消费kafka数据,通过kafka-consumer-groups.sh --describe可获取到消费偏移信息,但通过flink消费,不知道要怎么在kafka端获取偏移信息?









在 2020-01-12 21:17:40,"Benchao Li" <[hidden email]> 写道:

>setStartFromGroupOffsets
>说的是从kafka里面保存的offset开始消费,flink并没有保证kafka一定会有这个consumer的offset,如果没有的话,会按照kafka
>consumer的配置'auto.offset.reset'来设置这些partition的offset。
>
>具体情况可以参考文档[1].
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
>wqpapa <[hidden email]> 于2020年1月12日周日 下午9:09写道:
>
>> flink通过FlinkKafkaConsumer消费kafka主题,设置group.id
>> 按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息?
>> kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe
>> --group ,也取不到。请问下要怎么获取啊?
>
>
>
>--
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink消费kafka数据,如何从kafka端获取消费偏移信息

Benchao Li
Flink也会commit offset到Kafka的[1],当然在开checkpoint和不开checkpoint的时候表现有些不同。

offset
只要是commit到了kafka,查看offset的方式我理解跟用其他方式commit的offset的查阅方式应该没有区别的。如果你看不到flink消费的topic对应的offset,可能需要check下是什么原因没有commit
offset.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

wqpapa <[hidden email]> 于2020年1月12日周日 下午9:34写道:

> 感谢回复!主要想了解下如何从kafka端后台命令方式获取对应消费组的偏移信息。之前通过普通的java代码消费kafka数据,通过kafka-consumer-groups.sh
> --describe可获取到消费偏移信息,但通过flink消费,不知道要怎么在kafka端获取偏移信息?
>
>
>
>
>
>
>
>
>
> 在 2020-01-12 21:17:40,"Benchao Li" <[hidden email]> 写道:
> >setStartFromGroupOffsets
>
> >说的是从kafka里面保存的offset开始消费,flink并没有保证kafka一定会有这个consumer的offset,如果没有的话,会按照kafka
> >consumer的配置'auto.offset.reset'来设置这些partition的offset。
> >
> >具体情况可以参考文档[1].
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
> >
> >wqpapa <[hidden email]> 于2020年1月12日周日 下午9:09写道:
> >
> >> flink通过FlinkKafkaConsumer消费kafka主题,设置group.id
> >> 按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息?
> >> kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe
> >> --group ,也取不到。请问下要怎么获取啊?
> >
> >
> >
> >--
> >
> >Benchao Li
> >School of Electronics Engineering and Computer Science, Peking University
> >Tel:+86-15650713730
> >Email: [hidden email]; [hidden email]
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: Re: flink消费kafka数据,如何从kafka端获取消费偏移信息

Evan
命令行查看kafka消费的Offset命令如下:
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zkhost:2181 --group ${group.id} --topic ${topic_name}
替换zkhost 、group.id及topic_name即可
然后就会得到
Group&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Topic&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Pid Offset&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logSize&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Lag&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Owner
test&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dy_event&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;0&nbsp; &nbsp;8115733&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;10658588&nbsp; &nbsp; &nbsp; &nbsp; 2542855&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;none
test&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dy_event&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;1&nbsp; &nbsp;8114221&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;10658585&nbsp; &nbsp; &nbsp; &nbsp; 2544364&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;none
test&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dy_event&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;2&nbsp; &nbsp;8115173&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;10658587&nbsp; &nbsp; &nbsp; &nbsp; 2543414&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;none
test&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dy_event&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;3&nbsp; &nbsp;8115127&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;10658585&nbsp; &nbsp; &nbsp; &nbsp; 2543458&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;none
test&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dy_event&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;4&nbsp; &nbsp;8115160&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;10658587&nbsp; &nbsp; &nbsp; &nbsp; 2543427&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;none



这个pid Offset就是每个分区你对应消费组的偏移信息




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年1月12日(星期天) 晚上9:46
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Re: flink消费kafka数据,如何从kafka端获取消费偏移信息



Flink也会commit offset到Kafka的[1],当然在开checkpoint和不开checkpoint的时候表现有些不同。

offset
只要是commit到了kafka,查看offset的方式我理解跟用其他方式commit的offset的查阅方式应该没有区别的。如果你看不到flink消费的topic对应的offset,可能需要check下是什么原因没有commit
offset.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

wqpapa <[hidden email]&gt; 于2020年1月12日周日 下午9:34写道:

&gt; 感谢回复!主要想了解下如何从kafka端后台命令方式获取对应消费组的偏移信息。之前通过普通的java代码消费kafka数据,通过kafka-consumer-groups.sh
&gt; --describe可获取到消费偏移信息,但通过flink消费,不知道要怎么在kafka端获取偏移信息?
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; 在 2020-01-12 21:17:40,"Benchao Li" <[hidden email]&gt; 写道:
&gt; &gt;setStartFromGroupOffsets
&gt;
&gt; &gt;说的是从kafka里面保存的offset开始消费,flink并没有保证kafka一定会有这个consumer的offset,如果没有的话,会按照kafka
&gt; &gt;consumer的配置'auto.offset.reset'来设置这些partition的offset。
&gt; &gt;
&gt; &gt;具体情况可以参考文档[1].
&gt; &gt;
&gt; &gt;[1]
&gt; &gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
&gt; &gt;
&gt; &gt;wqpapa <[hidden email]&gt; 于2020年1月12日周日 下午9:09写道:
&gt; &gt;
&gt; &gt;&gt; flink通过FlinkKafkaConsumer消费kafka主题,设置group.id
&gt; &gt;&gt; 按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息?
&gt; &gt;&gt; kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe
&gt; &gt;&gt; --group ,也取不到。请问下要怎么获取啊?
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;--
&gt; &gt;
&gt; &gt;Benchao Li
&gt; &gt;School of Electronics Engineering and Computer Science, Peking University
&gt; &gt;Tel:+86-15650713730
&gt; &gt;Email: [hidden email]; [hidden email]
&gt;


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]