Hi everyone:
如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。 Regards, JackJiang |
可以选择从之前的某个checkpoint恢复吧
在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入: Hi everyone: 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。 Regards, JackJiang |
Hi 蒋涛涛
Flink的kafka consumer一共有三种offset commit模式: 1. OffsetCommitMode.DISABLED 完全disable offset的commit 2. OffsetCommitMode.ON_CHECKPOINTS Flink的默认行为,只有当Flink checkpoint完成时,才会将offset commit到Kafka 3. OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal client的默认行为,周期性将offset commit到kafka 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置 setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置 “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed state相关吧),也可以从最新的offset消费。 祝好 唐云 ________________________________ From: wang jinhai <[hidden email]> Sent: Thursday, August 29, 2019 10:25 To: [hidden email] <[hidden email]> Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 可以选择从之前的某个checkpoint恢复吧 在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入: Hi everyone: 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。 Regards, JackJiang |
Hi Yun Tang,
其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置 “auto.commit.enable” 为false,这个时候就不提交kafka offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。 Yun Tang <[hidden email]> 于2019年8月29日周四 上午10:57写道: > Hi 蒋涛涛 > > Flink的kafka consumer一共有三种offset commit模式: > > 1. OffsetCommitMode.DISABLED 完全disable offset的commit > 2. OffsetCommitMode.ON_CHECKPOINTS Flink的默认行为,只有当Flink > checkpoint完成时,才会将offset commit到Kafka > 3. OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal > client的默认行为,周期性将offset commit到kafka > > 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置 > setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置 > “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka > consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed > state相关吧),也可以从最新的offset消费。 > > 祝好 > 唐云 > ________________________________ > From: wang jinhai <[hidden email]> > Sent: Thursday, August 29, 2019 10:25 > To: [hidden email] <[hidden email]> > Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 > > 可以选择从之前的某个checkpoint恢复吧 > > > 在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入: > > Hi everyone: > > > 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。 > > Regards, > JackJiang > > |
Hi 蒋涛涛
有一种比较hack的方式可以实现,代码里面source是需要根据uid来找到相关的state进行offset恢复,如果你不想通过checkpoint恢复source的state,可以在代码里面手动把source的uid给改掉,同时在从checkpoint恢复时带上 --allowNonRestoredState 参数,这样kafka source从恢复的checkpoint/savepoint里面找不到相关的source state,就会从你设置的offset进行恢复了。 祝好 唐云 ________________________________ From: 蒋涛涛 <[hidden email]> Sent: Thursday, August 29, 2019 11:45 To: [hidden email] <[hidden email]> Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 Hi Yun Tang, 其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置 “auto.commit.enable” 为false,这个时候就不提交kafka offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。 Yun Tang <[hidden email]> 于2019年8月29日周四 上午10:57写道: > Hi 蒋涛涛 > > Flink的kafka consumer一共有三种offset commit模式: > > 1. OffsetCommitMode.DISABLED 完全disable offset的commit > 2. OffsetCommitMode.ON_CHECKPOINTS Flink的默认行为,只有当Flink > checkpoint完成时,才会将offset commit到Kafka > 3. OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal > client的默认行为,周期性将offset commit到kafka > > 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置 > setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置 > “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka > consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed > state相关吧),也可以从最新的offset消费。 > > 祝好 > 唐云 > ________________________________ > From: wang jinhai <[hidden email]> > Sent: Thursday, August 29, 2019 10:25 > To: [hidden email] <[hidden email]> > Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 > > 可以选择从之前的某个checkpoint恢复吧 > > > 在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入: > > Hi everyone: > > > 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。 > > Regards, > JackJiang > > |
Hi 唐云
你这个方法我可以尝试下(目前我使用的是flink 1.6.2 ) PS:flink 1.9 的 state processor api,应该可以直接修改 savepoint 中的数据,修改下 kafka 的 offset 祝好 蒋涛涛 Yun Tang <[hidden email]> 于2019年8月29日周四 下午12:12写道: > Hi 蒋涛涛 > > 有一种比较hack的方式可以实现,代码里面source是需要根据uid来找到相关的state进行offset恢复,如果你不想通过checkpoint恢复source的state,可以在代码里面手动把source的uid给改掉,同时在从checkpoint恢复时带上 > --allowNonRestoredState 参数,这样kafka > source从恢复的checkpoint/savepoint里面找不到相关的source state,就会从你设置的offset进行恢复了。 > > 祝好 > 唐云 > ________________________________ > From: 蒋涛涛 <[hidden email]> > Sent: Thursday, August 29, 2019 11:45 > To: [hidden email] <[hidden email]> > Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 > > Hi Yun Tang, > > 其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed > state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置 > “auto.commit.enable” 为false,这个时候就不提交kafka > offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。 > > > Yun Tang <[hidden email]> 于2019年8月29日周四 上午10:57写道: > > > Hi 蒋涛涛 > > > > Flink的kafka consumer一共有三种offset commit模式: > > > > 1. OffsetCommitMode.DISABLED 完全disable offset的commit > > 2. OffsetCommitMode.ON_CHECKPOINTS Flink的默认行为,只有当Flink > > checkpoint完成时,才会将offset commit到Kafka > > 3. OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal > > client的默认行为,周期性将offset commit到kafka > > > > 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置 > > setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置 > > “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka > > consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed > > state相关吧),也可以从最新的offset消费。 > > > > 祝好 > > 唐云 > > ________________________________ > > From: wang jinhai <[hidden email]> > > Sent: Thursday, August 29, 2019 10:25 > > To: [hidden email] <[hidden email]> > > Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 > > > > 可以选择从之前的某个checkpoint恢复吧 > > > > > > 在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入: > > > > Hi everyone: > > > > > > > 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。 > > > > Regards, > > JackJiang > > > > > |
Free forum by Nabble | Edit this page |