Flink 读取 Kafka多个 Partition问题,

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 读取 Kafka多个 Partition问题,

Qijun Feng
Dear All,

我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,  现在改成了所有地址,也换了 group.id


Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.216.85.201:9092,10.216.77.170:9092,10.216.77.188:9092");
properties.setProperty("group.id", "behavior-logs-aggregator");

FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 =
       new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev", new BehaviorLogDeserializationSchema(), properties);
kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01

处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2 的,

2020-04-02 14:54:58,532 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.


是哪里有问题吗?

Reply | Threaded
Open this post in threaded view
|

Re: Flink 读取 Kafka 多个 Partition 问题,

LakeShen
Hi Qijun,

看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:1585670400000L,后面是不是只有分区3写入数据,个人的想法。

Best,
LakeShen

Qijun Feng <[hidden email]> 于2020年4月2日周四 下午5:44写道:

> Dear All,
>
> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>  现在改成了所有地址,也换了 group.id
>
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> 10.216.77.170:9092,10.216.77.188:9092");
> properties.setProperty("group.id", "behavior-logs-aggregator");
>
> FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 =
>        new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev", new
> BehaviorLogDeserializationSchema(), properties);
> kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01
>
> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2
> 的,
>
> 2020-04-02 14:54:58,532 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> Consumer subtask 0 creating fetcher with offsets
> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>
>
> 是哪里有问题吗?
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink 读取 Kafka 多个 Partition 问题,

Lynn Chen






hi,  Qijun Feng:


我也遇到了类似的问题, 请问您后来是怎么解决的哈?

















在 2020-04-03 09:27:52,"LakeShen" <[hidden email]> 写道:

>Hi Qijun,
>
>看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:1585670400000L,后面是不是只有分区3写入数据,个人的想法。
>
>Best,
>LakeShen
>
>Qijun Feng <[hidden email]> 于2020年4月2日周四 下午5:44写道:
>
>> Dear All,
>>
>> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>>  现在改成了所有地址,也换了 group.id
>>
>>
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> 10.216.77.170:9092,10.216.77.188:9092");
>> properties.setProperty("group.id", "behavior-logs-aggregator");
>>
>> FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 =
>>        new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev", new
>> BehaviorLogDeserializationSchema(), properties);
>> kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01
>>
>> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2
>> 的,
>>
>> 2020-04-02 14:54:58,532 INFO
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> Consumer subtask 0 creating fetcher with offsets
>> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>>
>>
>> 是哪里有问题吗?
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink 读取 Kafka 多个 Partition 问题,

zhisheng
hi

如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。

eg:

      env.addSource(new FlinkKafkaConsumer011<>(
parameters.get("topic"),                new
JSONKeyValueDeserializationSchema(true),
buildKafkaProps(parameters)))                .flatMap(new
FlatMapFunction<ObjectNode, ObjectNode>() {
@Override                    public void flatMap(ObjectNode jsonNodes,
Collector<ObjectNode> collector) throws Exception {
    System.out.println(jsonNodes.get("value"));
System.out.println(jsonNodes.get("metadata").get("topic").asText());

System.out.println(jsonNodes.get("metadata").get("offset").asText());

System.out.println(jsonNodes.get("metadata").get("partition").asText());
                       collector.collect(jsonNodes);
    }                })                .print();

Best

zhisheng


Lynn Chen <[hidden email]> 于2020年10月23日周五 上午12:13写道:

>
>
>
>
>
>
> hi,  Qijun Feng:
>
>
> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-03 09:27:52,"LakeShen" <[hidden email]> 写道:
> >Hi Qijun,
> >
> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:1585670400000L,后面是不是只有分区3写入数据,个人的想法。
> >
> >Best,
> >LakeShen
> >
> >Qijun Feng <[hidden email]> 于2020年4月2日周四 下午5:44写道:
> >
> >> Dear All,
> >>
> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
> >>  现在改成了所有地址,也换了 group.id
> >>
> >>
> >> Properties properties = new Properties();
> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> >> 10.216.77.170:9092,10.216.77.188:9092");
> >> properties.setProperty("group.id", "behavior-logs-aggregator");
> >>
> >> FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 =
> >>        new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev", new
> >> BehaviorLogDeserializationSchema(), properties);
> >> kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01
> >>
> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
> 2
> >> 的,
> >>
> >> 2020-04-02 14:54:58,532 INFO
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> >> Consumer subtask 0 creating fetcher with offsets
> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
> >>
> >>
> >> 是哪里有问题吗?
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Flink 读取 Kafka 多个 Partition 问题,

Lynn Chen



hi, zhisheng:


我解析 json 后:
(xxx, xxx, xxx, topic, partition, offset)
=>


(false,1603420582310,"INSERT","test3.order",2,75)
(false,1603421312803,"INSERT","test3.order",2,76)
(false,1603421344819,"INSERT","test3.order",2,77)
(false,1603421344819,"INSERT","test3.order",2,78)


我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到


我的猜想:


我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)


broker1 配置:


listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


broker2 配置:


listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT







broker3 配置:


listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


本机连接kafka:
properties.setProperty("bootstrap.servers", "xxx-b-1:9797")


是跟这个配置有关吗?










在 2020-10-23 08:37:14,"zhisheng" <[hidden email]> 写道:

>hi
>
>如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
>来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
>
>eg:
>
>      env.addSource(new FlinkKafkaConsumer011<>(
>parameters.get("topic"),                new
>JSONKeyValueDeserializationSchema(true),
>buildKafkaProps(parameters)))                .flatMap(new
>FlatMapFunction<ObjectNode, ObjectNode>() {
>@Override                    public void flatMap(ObjectNode jsonNodes,
>Collector<ObjectNode> collector) throws Exception {
>    System.out.println(jsonNodes.get("value"));
>System.out.println(jsonNodes.get("metadata").get("topic").asText());
>
>System.out.println(jsonNodes.get("metadata").get("offset").asText());
>
>System.out.println(jsonNodes.get("metadata").get("partition").asText());
>                       collector.collect(jsonNodes);
>    }                })                .print();
>
>Best
>
>zhisheng
>
>
>Lynn Chen <[hidden email]> 于2020年10月23日周五 上午12:13写道:
>
>>
>>
>>
>>
>>
>>
>> hi,  Qijun Feng:
>>
>>
>> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-03 09:27:52,"LakeShen" <[hidden email]> 写道:
>> >Hi Qijun,
>> >
>> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:1585670400000L,后面是不是只有分区3写入数据,个人的想法。
>> >
>> >Best,
>> >LakeShen
>> >
>> >Qijun Feng <[hidden email]> 于2020年4月2日周四 下午5:44写道:
>> >
>> >> Dear All,
>> >>
>> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>> >>  现在改成了所有地址,也换了 group.id
>> >>
>> >>
>> >> Properties properties = new Properties();
>> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> >> 10.216.77.170:9092,10.216.77.188:9092");
>> >> properties.setProperty("group.id", "behavior-logs-aggregator");
>> >>
>> >> FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 =
>> >>        new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev", new
>> >> BehaviorLogDeserializationSchema(), properties);
>> >> kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01
>> >>
>> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
>> 2
>> >> 的,
>> >>
>> >> 2020-04-02 14:54:58,532 INFO
>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> >> Consumer subtask 0 creating fetcher with offsets
>> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>> >>
>> >>
>> >> 是哪里有问题吗?
>> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,

zhisheng
hi

既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说

> 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)

建议看看是不是这个转发有问题,只转发了一个节点

Best
zhisheng

Lynn Chen <[hidden email]> 于2020年10月23日周五 上午11:01写道:

>
>
>
> hi, zhisheng:
>
>
> 我解析 json 后:
> (xxx, xxx, xxx, topic, partition, offset)
> =>
>
>
> (false,1603420582310,"INSERT","test3.order",2,75)
> (false,1603421312803,"INSERT","test3.order",2,76)
> (false,1603421344819,"INSERT","test3.order",2,77)
> (false,1603421344819,"INSERT","test3.order",2,78)
>
>
> 我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到
>
>
> 我的猜想:
>
>
> 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>
>
> broker1 配置:
>
>
> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
> broker2 配置:
>
>
> listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
>
>
>
>
>
> broker3 配置:
>
>
> listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
> 本机连接kafka:
> properties.setProperty("bootstrap.servers", "xxx-b-1:9797")
>
>
> 是跟这个配置有关吗?
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-23 08:37:14,"zhisheng" <[hidden email]> 写道:
> >hi
> >
> >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
> >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
> >
> >eg:
> >
> >      env.addSource(new FlinkKafkaConsumer011<>(
> >parameters.get("topic"),                new
> >JSONKeyValueDeserializationSchema(true),
> >buildKafkaProps(parameters)))                .flatMap(new
> >FlatMapFunction<ObjectNode, ObjectNode>() {
> >@Override                    public void flatMap(ObjectNode jsonNodes,
> >Collector<ObjectNode> collector) throws Exception {
> >    System.out.println(jsonNodes.get("value"));
> >System.out.println(jsonNodes.get("metadata").get("topic").asText());
> >
> >System.out.println(jsonNodes.get("metadata").get("offset").asText());
> >
> >System.out.println(jsonNodes.get("metadata").get("partition").asText());
> >                       collector.collect(jsonNodes);
> >    }                })                .print();
> >
> >Best
> >
> >zhisheng
> >
> >
> >Lynn Chen <[hidden email]> 于2020年10月23日周五 上午12:13写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >> hi,  Qijun Feng:
> >>
> >>
> >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-04-03 09:27:52,"LakeShen" <[hidden email]> 写道:
> >> >Hi Qijun,
> >> >
> >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:1585670400000L,后面是不是只有分区3写入数据,个人的想法。
> >> >
> >> >Best,
> >> >LakeShen
> >> >
> >> >Qijun Feng <[hidden email]> 于2020年4月2日周四 下午5:44写道:
> >> >
> >> >> Dear All,
> >> >>
> >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
> >> >>  现在改成了所有地址,也换了 group.id
> >> >>
> >> >>
> >> >> Properties properties = new Properties();
> >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> >> >> 10.216.77.170:9092,10.216.77.188:9092");
> >> >> properties.setProperty("group.id", "behavior-logs-aggregator");
> >> >>
> >> >> FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 =
> >> >>        new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev",
> new
> >> >> BehaviorLogDeserializationSchema(), properties);
> >> >> kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01
> >> >>
> >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有
> partiton=1,或者
> >> 2
> >> >> 的,
> >> >>
> >> >> 2020-04-02 14:54:58,532 INFO
> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> >> >> Consumer subtask 0 creating fetcher with offsets
> >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
> >> >>
> >> >>
> >> >> 是哪里有问题吗?
> >> >>
> >> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,

Lynn Chen



hi ,all


我的问题解决了.   出现该问题的原因如下:


因为通过堡垒机端口转发, 所以需要在bootstrap.servers 写上所有 kafka borker即可


1. 修改 kafka 外网配置
>> broker1 配置:
>> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT

对应的broker2  broker3 分别改为 9798  9799



2. 让运维对三个端口进行映射转发
xxx-b-1:9797 --> xxx-a-1:9797
xxx-b-1:9798 --> xxx-a-2:9798
xxx-b-1:9799 --> xxx-a-3:9799



3. properties.setProperty("bootstrap.servers", "xxx-b-1:9797,xxx-b-1:9798,xxx-b-1:9799")





确实是少转发了数据节点, 导致的只能读取一个节点的数据  (正好这个节点对应的 partition 是 2,  才反应过来可能是少转发节点的问题)


感谢 zhisheng 的帮助!



在 2020-10-23 11:56:08,"zhisheng" <[hidden email]> 写道:

>hi
>
>既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说
>
>> 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>
>建议看看是不是这个转发有问题,只转发了一个节点
>
>Best
>zhisheng
>
>Lynn Chen <[hidden email]> 于2020年10月23日周五 上午11:01写道:
>
>>
>>
>>
>> hi, zhisheng:
>>
>>
>> 我解析 json 后:
>> (xxx, xxx, xxx, topic, partition, offset)
>> =>
>>
>>
>> (false,1603420582310,"INSERT","test3.order",2,75)
>> (false,1603421312803,"INSERT","test3.order",2,76)
>> (false,1603421344819,"INSERT","test3.order",2,77)
>> (false,1603421344819,"INSERT","test3.order",2,78)
>>
>>
>> 我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到
>>
>>
>> 我的猜想:
>>
>>
>> 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>>
>>
>> broker1 配置:
>>
>>
>> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT
>>
>>
>> broker2 配置:
>>
>>
>> listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT
>>
>>
>>
>>
>>
>>
>>
>> broker3 配置:
>>
>>
>> listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT
>>
>>
>> 本机连接kafka:
>> properties.setProperty("bootstrap.servers", "xxx-b-1:9797")
>>
>>
>> 是跟这个配置有关吗?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-10-23 08:37:14,"zhisheng" <[hidden email]> 写道:
>> >hi
>> >
>> >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
>> >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
>> >
>> >eg:
>> >
>> >      env.addSource(new FlinkKafkaConsumer011<>(
>> >parameters.get("topic"),                new
>> >JSONKeyValueDeserializationSchema(true),
>> >buildKafkaProps(parameters)))                .flatMap(new
>> >FlatMapFunction<ObjectNode, ObjectNode>() {
>> >@Override                    public void flatMap(ObjectNode jsonNodes,
>> >Collector<ObjectNode> collector) throws Exception {
>> >    System.out.println(jsonNodes.get("value"));
>> >System.out.println(jsonNodes.get("metadata").get("topic").asText());
>> >
>> >System.out.println(jsonNodes.get("metadata").get("offset").asText());
>> >
>> >System.out.println(jsonNodes.get("metadata").get("partition").asText());
>> >                       collector.collect(jsonNodes);
>> >    }                })                .print();
>> >
>> >Best
>> >
>> >zhisheng
>> >
>> >
>> >Lynn Chen <[hidden email]> 于2020年10月23日周五 上午12:13写道:
>> >
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> hi,  Qijun Feng:
>> >>
>> >>
>> >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-04-03 09:27:52,"LakeShen" <[hidden email]> 写道:
>> >> >Hi Qijun,
>> >> >
>> >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:1585670400000L,后面是不是只有分区3写入数据,个人的想法。
>> >> >
>> >> >Best,
>> >> >LakeShen
>> >> >
>> >> >Qijun Feng <[hidden email]> 于2020年4月2日周四 下午5:44写道:
>> >> >
>> >> >> Dear All,
>> >> >>
>> >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>> >> >>  现在改成了所有地址,也换了 group.id
>> >> >>
>> >> >>
>> >> >> Properties properties = new Properties();
>> >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> >> >> 10.216.77.170:9092,10.216.77.188:9092");
>> >> >> properties.setProperty("group.id", "behavior-logs-aggregator");
>> >> >>
>> >> >> FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 =
>> >> >>        new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev",
>> new
>> >> >> BehaviorLogDeserializationSchema(), properties);
>> >> >> kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01
>> >> >>
>> >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有
>> partiton=1,或者
>> >> 2
>> >> >> 的,
>> >> >>
>> >> >> 2020-04-02 14:54:58,532 INFO
>> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> >> >> Consumer subtask 0 creating fetcher with offsets
>> >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>> >> >>
>> >> >>
>> >> >> 是哪里有问题吗?
>> >> >>
>> >> >>
>> >>
>>