flink connect kafka

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink connect kafka

阿西
flink版本 1.10.0  没有使用checkpoint
Kafka version : 0.10.2.1
数据源为kafka
代码如下:
val topicHkList = List(HqKafkaTopic.KAFKA_TOPIC_HK_TRADE_TICKER, HqKafkaTopic.KAFKA_TOPIC_HK_INDEX)
    val kafkaHkConsumer: FlinkKafkaConsumer[Tuple3[String, String, String]] =
      new FlinkKafkaConsumer(topicHkList, new CustomKafkaDeserializationSchema(), properties)

    // 配置 Kafka Consumer 开始消费的位置
    kafkaHkConsumer.setStartFromLatest()
    val sourseHk = env
      .addSource(kafkaHkConsumer).name("hk kafka source")
      .map(new HkKafkaDecodeMap)
      .map(new HkKafkaObj2HqMap)
      .map(new HkMsgPushDecodeMap)
      .filter(new HkMsgPushFilter)

消费数据的时候, 发现数据出不来, 打印debug日志:
consumer.internals.Fetcher  - Ignoring fetched records for hq.hk-index-topic-new-0 at offset 6921349 since the current position is 6921364
consumer.internals.Fetcher  - Sending fetch for partitions [hq.hk-tradeTicker-topic-new-0, hq.hk-index-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher  - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12716799 since the current position is 12716919
consumer.internals.Fetcher  - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher  - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12716919 since the current position is 12717048
consumer.internals.Fetcher  - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher  - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12717048 since the current position is 12717071
consumer.internals.Fetcher  - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null)

好像是跟offset有关, 请问这个是什么原因呢? 我这边代码需要设置什么吗?




[hidden email]