Re: Flink job消费kafka 失败,无法拿到offset值

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Flink job消费kafka 失败,无法拿到offset值

Qingsheng Ren
你好 Jacob,

从错误上来看是 Kafka Consumer 没有连上 Kafka Brokers。这些方法可能帮助排查问题:

1. 确认 Flink TaskManager 和 Kafka Broker 之间的网络连通性。
2. Flink TaskManager 与 Kafka Broker 之间网络连通并不意味着能够消费数据,可能需要修改 Kafka Broker 的配置。这篇文章[1] 或许会有帮助,绝大多数 Kafka 的连接问题是由于文章中描述的配置问题导致的。
3. 配置 Log4j 将 org.apache.kafka.clients.consumer 的 Log level 配置为 DEBUG 或 TRACE,在日志中获取到更多的信息以帮助排查。

希望有所帮助!

[1] https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/


Best Regards,

Qingsheng Ren
在 2021年4月14日 +0800 PM12:13,Jacob <[hidden email]>,写道:

> 有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink
> Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。
>
> Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:
>
> java.lang.Exception: org.apache.kafka.common.errors.TimeoutException:
> Timeout of 60000ms expired before the position for partition Test-topic-27
> could be determined
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
> 60000ms expired before the position for partition Test-topic-27 could be
> determined
>
> 查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。
>
> 请指教
>
>
>
> -----
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink job消费kafka 失败,无法拿到offset值

wysstartgo
应该是flink 连接不上kafka ,建议往docker 的网络设置上找找看问题

发自我的iPhone

> 在 2021年4月23日,下午12:56,Qingsheng Ren <[hidden email]> 写道:
>
> 你好 Jacob,
>
> 从错误上来看是 Kafka Consumer 没有连上 Kafka Brokers。这些方法可能帮助排查问题:
>
> 1. 确认 Flink TaskManager 和 Kafka Broker 之间的网络连通性。
> 2. Flink TaskManager 与 Kafka Broker 之间网络连通并不意味着能够消费数据,可能需要修改 Kafka Broker 的配置。这篇文章[1] 或许会有帮助,绝大多数 Kafka 的连接问题是由于文章中描述的配置问题导致的。
> 3. 配置 Log4j 将 org.apache.kafka.clients.consumer 的 Log level 配置为 DEBUG 或 TRACE,在日志中获取到更多的信息以帮助排查。
>
> 希望有所帮助!
>
> [1] https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
>
> —
> Best Regards,
>
> Qingsheng Ren
> 在 2021年4月14日 +0800 PM12:13,Jacob <[hidden email]>,写道:
>> 有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink
>> Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。
>>
>> Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:
>>
>> java.lang.Exception: org.apache.kafka.common.errors.TimeoutException:
>> Timeout of 60000ms expired before the position for partition Test-topic-27
>> could be determined
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
>> 60000ms expired before the position for partition Test-topic-27 could be
>> determined
>>
>> 查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。
>>
>> 请指教
>>
>>
>>
>> -----
>> Thanks!
>> Jacob
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Reply | Threaded
Open this post in threaded view
|

Re: Flink job消费kafka 失败,无法拿到offset值

zhao liang
一个查Flink与集群B的网络连通性,一个是查集群B本身服务状态,比如是所有topic都连不上还是某个topic连不上,是不是有节点挂掉了,另外一个再看看kafka的_offset_consumer是不是单节点正好挂了,如果是单个topic看分区问题。

> 2021年4月23日 18:45,[hidden email] 写道:
>
> 应该是flink 连接不上kafka ,建议往docker 的网络设置上找找看问题
>
> 发自我的iPhone
>
>> 在 2021年4月23日,下午12:56,Qingsheng Ren <[hidden email]> 写道:
>>
>> 你好 Jacob,
>>
>> 从错误上来看是 Kafka Consumer 没有连上 Kafka Brokers。这些方法可能帮助排查问题:
>>
>> 1. 确认 Flink TaskManager 和 Kafka Broker 之间的网络连通性。
>> 2. Flink TaskManager 与 Kafka Broker 之间网络连通并不意味着能够消费数据,可能需要修改 Kafka Broker 的配置。这篇文章[1] 或许会有帮助,绝大多数 Kafka 的连接问题是由于文章中描述的配置问题导致的。
>> 3. 配置 Log4j 将 org.apache.kafka.clients.consumer 的 Log level 配置为 DEBUG 或 TRACE,在日志中获取到更多的信息以帮助排查。
>>
>> 希望有所帮助!
>>
>> [1] https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
>>
>> —
>> Best Regards,
>>
>> Qingsheng Ren
>> 在 2021年4月14日 +0800 PM12:13,Jacob <[hidden email]>,写道:
>>> 有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink
>>> Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。
>>>
>>> Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:
>>>
>>> java.lang.Exception: org.apache.kafka.common.errors.TimeoutException:
>>> Timeout of 60000ms expired before the position for partition Test-topic-27
>>> could be determined
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
>>> 60000ms expired before the position for partition Test-topic-27 could be
>>> determined
>>>
>>> 查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。
>>>
>>> 请指教
>>>
>>>
>>>
>>> -----
>>> Thanks!
>>> Jacob
>>> --
>>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>
>