Login  Register

Question regarding checkpoint/savepoint and State Processor API

classic Classic list List threaded Threaded
2 messages Options Options
Embed post
Reply | Threaded
Open this post in threaded view
| More
Print post

Question regarding checkpoint/savepoint and State Processor API

Eleanore Jin
49 posts
Hi there,

1. in my job, I have a broadcast stream, initially there is no savepoint
can be used as bootstrap values for the broadcast stream states.
  BootstrapTransformation transform =

Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
    .withOperator(OPERATOR_UID, transform)

Question: bootstrapWith(dataSet) is required, normally, the dataSet comes
from the old savepoint, in this case, I dont have one, how should I deal
with it? Or it is must required?

2. As messages coming through broadcast stream, the state gets updated

3. I would like to periodically save the broadcast state to a file via
Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)

4. when the job gets cancelled, and next time when re-start the job, the
broadcast initial state can be loaded from the previous savepoint.

ExistingSavepoint existingSavepoint = Savepoint.load(environment,
new MemoryStateBackend());

dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID,

Question: now assume I got the old state as dataSet, how can I use it
in the BroadcastProcessFunction as the initial state of the broadcast

Thanks a lot for the help!

Reply | Threaded
Open this post in threaded view
| More
Print post

Re: Question regarding checkpoint/savepoint and State Processor API

Eleanore Jin
49 posts
Hi Seth,

Thanks for the prompt response! Regarding my second question, once I have
converted the existing savepoint to dataset, how can I convert the dataset
into BroadcastState?

For example, in my BroadcastProcessFunction:

public void processBroadcastElement(String key, Context context,
Collector<JsonNode> collector) throws Exception {

// Todo how to add existing BroadcastState from savepoint beforehand?
  BroadcastState<String, String> broadcastState =
  broadcastState.put(key, key);

Thanks a lot!

On Tue, Jan 21, 2020 at 7:12 AM Seth Wiesman <[hidden email]> wrote:

> Hi Eleanore,
> Bootstrap data is not required to come from an existing savepoint. It can
> come from any DataSet which could be backed by a file, database, or any
> other system. The state processor api is also not a tool you are going to
> use between every start and stop of your job. It is just to bootstrap the
> initial state of your application. After that, you will use savepoints to
> carry over the current state of your applications between runs.
> On Mon, Jan 20, 2020 at 6:07 PM Jin Yi <[hidden email]> wrote:
>> Hi there,
>> 1. in my job, I have a broadcast stream, initially there is no savepoint
>> can be used as bootstrap values for the broadcast stream states.
>>   BootstrapTransformation transform =
>> OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>>     .withOperator(OPERATOR_UID, transform)
>>     .write("file:///tmp/new_savepoints");*/
>> Question: bootstrapWith(dataSet) is required, normally, the dataSet comes
>> from the old savepoint, in this case, I dont have one, how should I deal
>> with it? Or it is must required?
>> 2. As messages coming through broadcast stream, the state gets updated
>> 3. I would like to periodically save the broadcast state to a file via
>> savepoints
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>> .withOperator(OPERATOR_UID, transform)
>> .write("file:///tmp/new_savepoints");
>> 4. when the job gets cancelled, and next time when re-start the job, the
>> broadcast initial state can be loaded from the previous savepoint.
>> ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend());
>> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
>> Question: now assume I got the old state as dataSet, how can I use it in the BroadcastProcessFunction as the initial state of the broadcast state?
>> Thanks a lot for the help!
>> Eleanore
> --
> Seth Wiesman | Solutions Architect
> +1 314 387 1463
> <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen