flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

jackjiang
Hi everyone:

如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。

Regards,
JackJiang
Reply | Threaded
Open this post in threaded view
|

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

wang jinhai
可以选择从之前的某个checkpoint恢复吧


在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入:

    Hi everyone:
   
    如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。
   
    Regards,
    JackJiang
   
Reply | Threaded
Open this post in threaded view
|

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

Yun Tang
Hi 蒋涛涛

Flink的kafka consumer一共有三种offset commit模式:

  1.  OffsetCommitMode.DISABLED                   完全disable offset的commit
  2.  OffsetCommitMode.ON_CHECKPOINTS   Flink的默认行为,只有当Flink checkpoint完成时,才会将offset commit到Kafka
  3.  OffsetCommitMode.KAFKA_PERIODIC     使用Kafka的internal client的默认行为,周期性将offset commit到kafka

如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置 setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置 “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed state相关吧),也可以从最新的offset消费。

祝好
唐云
________________________________
From: wang jinhai <[hidden email]>
Sent: Thursday, August 29, 2019 10:25
To: [hidden email] <[hidden email]>
Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

可以选择从之前的某个checkpoint恢复吧


在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入:

    Hi everyone:

    如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。

    Regards,
    JackJiang

Reply | Threaded
Open this post in threaded view
|

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

jackjiang
Hi  Yun Tang,
其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed
state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置
“auto.commit.enable” 为false,这个时候就不提交kafka
offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。


Yun Tang <[hidden email]> 于2019年8月29日周四 上午10:57写道:

> Hi 蒋涛涛
>
> Flink的kafka consumer一共有三种offset commit模式:
>
>   1.  OffsetCommitMode.DISABLED                   完全disable offset的commit
>   2.  OffsetCommitMode.ON_CHECKPOINTS   Flink的默认行为,只有当Flink
> checkpoint完成时,才会将offset commit到Kafka
>   3.  OffsetCommitMode.KAFKA_PERIODIC     使用Kafka的internal
> client的默认行为,周期性将offset commit到kafka
>
> 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置
> setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置
> “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka
> consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed
> state相关吧),也可以从最新的offset消费。
>
> 祝好
> 唐云
> ________________________________
> From: wang jinhai <[hidden email]>
> Sent: Thursday, August 29, 2019 10:25
> To: [hidden email] <[hidden email]>
> Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论
>
> 可以选择从之前的某个checkpoint恢复吧
>
>
> 在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入:
>
>     Hi everyone:
>
>
> 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。
>
>     Regards,
>     JackJiang
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

Yun Tang
Hi 蒋涛涛

有一种比较hack的方式可以实现,代码里面source是需要根据uid来找到相关的state进行offset恢复,如果你不想通过checkpoint恢复source的state,可以在代码里面手动把source的uid给改掉,同时在从checkpoint恢复时带上 --allowNonRestoredState 参数,这样kafka source从恢复的checkpoint/savepoint里面找不到相关的source state,就会从你设置的offset进行恢复了。

祝好
唐云
________________________________
From: 蒋涛涛 <[hidden email]>
Sent: Thursday, August 29, 2019 11:45
To: [hidden email] <[hidden email]>
Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

Hi  Yun Tang,
其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed
state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置
“auto.commit.enable” 为false,这个时候就不提交kafka
offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。


Yun Tang <[hidden email]> 于2019年8月29日周四 上午10:57写道:

> Hi 蒋涛涛
>
> Flink的kafka consumer一共有三种offset commit模式:
>
>   1.  OffsetCommitMode.DISABLED                   完全disable offset的commit
>   2.  OffsetCommitMode.ON_CHECKPOINTS   Flink的默认行为,只有当Flink
> checkpoint完成时,才会将offset commit到Kafka
>   3.  OffsetCommitMode.KAFKA_PERIODIC     使用Kafka的internal
> client的默认行为,周期性将offset commit到kafka
>
> 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置
> setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置
> “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka
> consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed
> state相关吧),也可以从最新的offset消费。
>
> 祝好
> 唐云
> ________________________________
> From: wang jinhai <[hidden email]>
> Sent: Thursday, August 29, 2019 10:25
> To: [hidden email] <[hidden email]>
> Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论
>
> 可以选择从之前的某个checkpoint恢复吧
>
>
> 在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入:
>
>     Hi everyone:
>
>
> 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。
>
>     Regards,
>     JackJiang
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

jackjiang
Hi 唐云

你这个方法我可以尝试下(目前我使用的是flink 1.6.2 )

PS:flink 1.9 的 state processor api,应该可以直接修改 savepoint 中的数据,修改下 kafka 的
offset

祝好
蒋涛涛

Yun Tang <[hidden email]> 于2019年8月29日周四 下午12:12写道:

> Hi 蒋涛涛
>
> 有一种比较hack的方式可以实现,代码里面source是需要根据uid来找到相关的state进行offset恢复,如果你不想通过checkpoint恢复source的state,可以在代码里面手动把source的uid给改掉,同时在从checkpoint恢复时带上
> --allowNonRestoredState 参数,这样kafka
> source从恢复的checkpoint/savepoint里面找不到相关的source state,就会从你设置的offset进行恢复了。
>
> 祝好
> 唐云
> ________________________________
> From: 蒋涛涛 <[hidden email]>
> Sent: Thursday, August 29, 2019 11:45
> To: [hidden email] <[hidden email]>
> Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论
>
> Hi  Yun Tang,
>
> 其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed
> state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置
> “auto.commit.enable” 为false,这个时候就不提交kafka
> offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。
>
>
> Yun Tang <[hidden email]> 于2019年8月29日周四 上午10:57写道:
>
> > Hi 蒋涛涛
> >
> > Flink的kafka consumer一共有三种offset commit模式:
> >
> >   1.  OffsetCommitMode.DISABLED                   完全disable offset的commit
> >   2.  OffsetCommitMode.ON_CHECKPOINTS   Flink的默认行为,只有当Flink
> > checkpoint完成时,才会将offset commit到Kafka
> >   3.  OffsetCommitMode.KAFKA_PERIODIC     使用Kafka的internal
> > client的默认行为,周期性将offset commit到kafka
> >
> > 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置
> > setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置
> > “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka
> > consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed
> > state相关吧),也可以从最新的offset消费。
> >
> > 祝好
> > 唐云
> > ________________________________
> > From: wang jinhai <[hidden email]>
> > Sent: Thursday, August 29, 2019 10:25
> > To: [hidden email] <[hidden email]>
> > Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论
> >
> > 可以选择从之前的某个checkpoint恢复吧
> >
> >
> > 在 2019/8/29 上午10:01,“蒋涛涛”<[hidden email]> 写入:
> >
> >     Hi everyone:
> >
> >
> >
> 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。
> >
> >     Regards,
> >     JackJiang
> >
> >
>