flink版本 1.10.0 没有使用checkpoint
Kafka version : 0.10.2.1
数据源为kafka
代码如下:
val topicHkList = List(HqKafkaTopic.KAFKA_TOPIC_HK_TRADE_TICKER, HqKafkaTopic.KAFKA_TOPIC_HK_INDEX)
val kafkaHkConsumer: FlinkKafkaConsumer[Tuple3[String, String, String]] =
new FlinkKafkaConsumer(topicHkList, new CustomKafkaDeserializationSchema(), properties)
// 配置 Kafka Consumer 开始消费的位置
kafkaHkConsumer.setStartFromLatest()
val sourseHk = env
.addSource(kafkaHkConsumer).name("hk kafka source")
.map(new HkKafkaDecodeMap)
.map(new HkKafkaObj2HqMap)
.map(new HkMsgPushDecodeMap)
.filter(new HkMsgPushFilter)
消费数据的时候, 发现数据出不来, 打印debug日志:
consumer.internals.Fetcher - Ignoring fetched records for hq.hk-index-topic-new-0 at offset 6921349 since the current position is 6921364
consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-tradeTicker-topic-new-0, hq.hk-index-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12716799 since the current position is 12716919
consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12716919 since the current position is 12717048
consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12717048 since the current position is 12717071
consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null)
好像是跟offset有关, 请问这个是什么原因呢? 我这边代码需要设置什么吗?
[hidden email]