一系列关于基于状态重启任务的问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

一系列关于基于状态重启任务的问题

Zhao,Yi(SEC)
请教几个关于基于状态重启的问题。
问题1:基于检查点/保存点启动时候能否指定部分结点不使用状态。
为什么有这么个需求呢,下面说下背景。
任务A:5分钟粒度的统计PV,使用event time,每10s一次触发更新到数据库。
任务B:天级别任务,利用了状态。
如上任务A和B,我整合为一个大任务提交到flink执行。假设有某种场景下,某些数据错误等,我需要做修复等。并且修复方案需要能做到:从指定时间开始运行(这个是我基于kafkaSouce设置开始时间实现),同时配合一个时间范围过滤算子实现。但是flink如果基于状态重启,则kafkaSouce的offset会基于状态中的offset来做,而不是我配置的开始时间来做。但我又不能不基于状态重启,因为还有任务B是不可容忍丢失状态的。

这种情况怎么搞呢?当然通过flink提供的状态操作API去修改状态可能是一种方式,但感觉成本挺高。或者从保存下的保存点/检查点的路径来看,有没有可能从名字看出哪个状态文件是哪个结点的呢?我能否简单找到kafkaSouce结点的状态文件删除,并且配合flink提供的—allowNonRestoredState实现KafkaSouce不基于状态重启,而其他结点基于状态重启呢?当然也不清楚即使这可行,那么这种情况下KafkaSource是否会按照我设置的开始时间去消费。

问题2:任务合并或拆分问题。
拆分:
仍然假设有任务A和任务B,放在同一个JOB中。如果业务需要拆分开,这个相对容易实现。我只需要做个保存点。然后启动基于保存点任务A(配合—allowNonRestoredState,任务B的状态会被忽略)。再然后启动任务B(配合—allowNonRestoredState,任务A的状态会被忽略)。
合并:
问题来了,合并case怎么做。任务A和任务B如想合并怎么做呢?还是之前那个想法,状态文件结构是否可以直接合并到一起呢?比如任务的保存点文件夹和任务B的保存点文件夹合并后是否可以直接被用?
当然我并不清楚检查点和保存点的保存文件夹中文件命名的含义是否是结点uid啥的。是否可行呢?
Reply | Threaded
Open this post in threaded view
|

Re: 一系列关于基于状态重启任务的问题

Congxian Qiu
hi
   1 checkpoint/savepoint 可以理解为将 状态备份到远程存储,恢复的时候会通过  operator 的 uid 来恢复
state,如果你确定不希望某些 operator 的 state 不进行恢复的话,或者使用不同的 uid
可以达到你的需求,具体的可以看一下这个文档的内容[1]
   2 合并的时候如果想把 savepoint/checkpoint 用起来,还是需要修改 checkpoint/savepoint
的内容,或者你可以试试 state processor api[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#savepoints
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


Zhao,Yi(SEC) <[hidden email]> 于2020年8月11日周二 上午11:55写道:

> 请教几个关于基于状态重启的问题。
> 问题1:基于检查点/保存点启动时候能否指定部分结点不使用状态。
> 为什么有这么个需求呢,下面说下背景。
> 任务A:5分钟粒度的统计PV,使用event time,每10s一次触发更新到数据库。
> 任务B:天级别任务,利用了状态。
>
> 如上任务A和B,我整合为一个大任务提交到flink执行。假设有某种场景下,某些数据错误等,我需要做修复等。并且修复方案需要能做到:从指定时间开始运行(这个是我基于kafkaSouce设置开始时间实现),同时配合一个时间范围过滤算子实现。但是flink如果基于状态重启,则kafkaSouce的offset会基于状态中的offset来做,而不是我配置的开始时间来做。但我又不能不基于状态重启,因为还有任务B是不可容忍丢失状态的。
>
>
> 这种情况怎么搞呢?当然通过flink提供的状态操作API去修改状态可能是一种方式,但感觉成本挺高。或者从保存下的保存点/检查点的路径来看,有没有可能从名字看出哪个状态文件是哪个结点的呢?我能否简单找到kafkaSouce结点的状态文件删除,并且配合flink提供的—allowNonRestoredState实现KafkaSouce不基于状态重启,而其他结点基于状态重启呢?当然也不清楚即使这可行,那么这种情况下KafkaSource是否会按照我设置的开始时间去消费。
>
> 问题2:任务合并或拆分问题。
> 拆分:
>
> 仍然假设有任务A和任务B,放在同一个JOB中。如果业务需要拆分开,这个相对容易实现。我只需要做个保存点。然后启动基于保存点任务A(配合—allowNonRestoredState,任务B的状态会被忽略)。再然后启动任务B(配合—allowNonRestoredState,任务A的状态会被忽略)。
> 合并:
>
> 问题来了,合并case怎么做。任务A和任务B如想合并怎么做呢?还是之前那个想法,状态文件结构是否可以直接合并到一起呢?比如任务的保存点文件夹和任务B的保存点文件夹合并后是否可以直接被用?
> 当然我并不清楚检查点和保存点的保存文件夹中文件命名的含义是否是结点uid啥的。是否可行呢?
>