修改topic名称后从Savepoint重启会怎么消费Kafka

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

修改topic名称后从Savepoint重启会怎么消费Kafka

Shuai Xia

Hi,大佬们
突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
可以手动控制么?
Reply | Threaded
Open this post in threaded view
|

Re:修改topic名称后从Savepoint重启会怎么消费Kafka

熊云昆



可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来














在 2020-12-01 20:59:48,"Shuai Xia" <[hidden email]> 写道:
>
>Hi,大佬们
>突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
>会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
>可以手动控制么?
Reply | Threaded
Open this post in threaded view
|

回复:修改topic名称后从Savepoint重启会怎么消费Kafka

Shuai Xia
hi,实时上并不是你说的这样,从sp重启时因为存在RestoreState,而且Topic名称被修改,会导致restoredState内找不到新的KafkaTopicPartition
新的消费位置会置为EARLIEST_OFFSET


if (restoredState != null) {
   for (KafkaTopicPartition partition : allPartitions) {
      if (!restoredState.containsKey(partition)) {
         restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
      }
   }




------------------------------------------------------------------
发件人:熊云昆 <[hidden email]>
发送时间:2020年12月1日(星期二) 22:57
收件人:user-zh <[hidden email]>; Shuai Xia <[hidden email]>
主 题:Re:修改topic名称后从Savepoint重启会怎么消费Kafka


可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来





在 2020-12-01 20:59:48,"Shuai Xia" <[hidden email]> 写道:
>
>Hi,大佬们
>突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
>会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
>可以手动控制么?


Reply | Threaded
Open this post in threaded view
|

Re: 修改topic名称后从Savepoint重启会怎么消费Kafka

zhisheng
这个是正解,参考之前提的一个 Issue https://issues.apache.org/jira/browse/FLINK-16865

Best
zhisheng

Shuai Xia <[hidden email]> 于2020年12月2日周三 下午2:03写道:

>
> hi,实时上并不是你说的这样,从sp重启时因为存在RestoreState,而且Topic名称被修改,会导致restoredState内找不到新的KafkaTopicPartition
> 新的消费位置会置为EARLIEST_OFFSET
>
>
> if (restoredState != null) {
>    for (KafkaTopicPartition partition : allPartitions) {
>       if (!restoredState.containsKey(partition)) {
>          restoredState.put(partition,
> KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
>       }
>    }
>
>
>
>
> ------------------------------------------------------------------
> 发件人:熊云昆 <[hidden email]>
> 发送时间:2020年12月1日(星期二) 22:57
> 收件人:user-zh <[hidden email]>; Shuai Xia <[hidden email]>
> 主 题:Re:修改topic名称后从Savepoint重启会怎么消费Kafka
>
>
> 可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来
>
>
>
>
>
> 在 2020-12-01 20:59:48,"Shuai Xia" <[hidden email]> 写道:
> >
> >Hi,大佬们
> >突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
> >会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
> >可以手动控制么?
>
>
>