[State Processor API] how to convert savepoint back to broadcast state

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[State Processor API] how to convert savepoint back to broadcast state

Eleanore Jin
Hi there,

I would like to read the savepoints (for broadcast state) back into the
broadcast state, how should I do it?

// load the existingSavepoint;
ExistingSavepoint existingSavepoint = Savepoint.load(environment,
"file:///tmp/new_savepoints", new MemoryStateBackend());

// read state from existing savepoint
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID,
"largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);

// TODO in BoradcastProcessFunction, how can I put the savepoint
dataset back into BroadcastState?

Thanks!

Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: [State Processor API] how to convert savepoint back to broadcast state

Yun Tang
Hi Yi

Can the official doc of writing broad cast state [1] satisfies your request?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1

Best
Yun Tang
________________________________
From: Jin Yi <[hidden email]>
Sent: Thursday, January 23, 2020 8:12
To: user <[hidden email]>; [hidden email] <[hidden email]>
Subject: [State Processor API] how to convert savepoint back to broadcast state

Hi there,

I would like to read the savepoints (for broadcast state) back into the broadcast state, how should I do it?


// load the existingSavepoint;
ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend());

// read state from existing savepoint
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

// TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState?

Thanks!

Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: [State Processor API] how to convert savepoint back to broadcast state

Eleanore Jin
Hi Yun,

Thanks for the response, I have checked official document, and I have
referred this example to write the broadcast state to a savepoint.

My question is: I can use state processor api to read back the savepoint
into a dataSet, but how can I use the dataSet as the initial value for the
broadcast state in the BroadcastProcessFunction.

Thanks a lot!

Eleanore

On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <[hidden email]> wrote:

> Hi Yi
>
> Can the official doc of writing broad cast state [1] satisfies your
> request?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1
>
> Best
> Yun Tang
> ------------------------------
> *From:* Jin Yi <[hidden email]>
> *Sent:* Thursday, January 23, 2020 8:12
> *To:* user <[hidden email]>; [hidden email] <
> [hidden email]>
> *Subject:* [State Processor API] how to convert savepoint back to
> broadcast state
>
> Hi there,
>
> I would like to read the savepoints (for broadcast state) back into the
> broadcast state, how should I do it?
>
> // load the existingSavepoint;
> ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend());
>
> // read state from existing savepoint
> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
>
> // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState?
>
> Thanks!
>
> Eleanore
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [State Processor API] how to convert savepoint back to broadcast state

Eleanore Jin
Hi Yun,

After search around in the documentation, I tried extends
BroadcastProcessFunction implements CheckpointedFunction. And I have
initialized broadcast state in public void
initializeState(FunctionInitializationContext
context) method, it seems working fine.

Here is the doc I followed:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction

Thanks a lot for your help!
Eleanore

On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <[hidden email]> wrote:

> Hi Yun,
>
> Thanks for the response, I have checked official document, and I have
> referred this example to write the broadcast state to a savepoint.
>
> My question is: I can use state processor api to read back the savepoint
> into a dataSet, but how can I use the dataSet as the initial value for the
> broadcast state in the BroadcastProcessFunction.
>
> Thanks a lot!
>
> Eleanore
>
> On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <[hidden email]> wrote:
>
>> Hi Yi
>>
>> Can the official doc of writing broad cast state [1] satisfies your
>> request?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Jin Yi <[hidden email]>
>> *Sent:* Thursday, January 23, 2020 8:12
>> *To:* user <[hidden email]>; [hidden email] <
>> [hidden email]>
>> *Subject:* [State Processor API] how to convert savepoint back to
>> broadcast state
>>
>> Hi there,
>>
>> I would like to read the savepoints (for broadcast state) back into the
>> broadcast state, how should I do it?
>>
>> // load the existingSavepoint;
>> ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend());
>>
>> // read state from existing savepoint
>> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
>>
>> // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState?
>>
>> Thanks!
>>
>> Eleanore
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [State Processor API] how to convert savepoint back to broadcast state

Yun Tang
Hi Yi

Glad to know you have already resolved it. State process API would use data stream API instead of data set API in the future [1].

Besides, you could also follow the guide in "the brodcast state pattern"[2]


// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> stateDescriptor = new MapStateDescriptor<>(
                        "RulesBroadcastState",
                        BasicTypeInfo.STRING_TYPE_INFO,
                        TypeInformation.of(new TypeHint<Rule>() {}));

// broadcast the rules and create the broadcast state
BroadcastStream<Rule> broadcastStream = ruleStream
                        .broadcast(stateDescriptor);


colorPartitionedStream
                 .connect(broadcastStream)
                 .process(

                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                         // my matching logic
                     }
                 ).uid("your-uid");

Make sure the uid and the state-name are the same with those in your savepoint, the CoBroadcastWithKeyedOperator would initialize the broadcast state when opening. [3]


[1] https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
[3] https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101


Best
Yun Tang

________________________________
From: Jin Yi <[hidden email]>
Sent: Monday, January 27, 2020 14:50
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: [State Processor API] how to convert savepoint back to broadcast state

Hi Yun,

After search around in the documentation, I tried extends BroadcastProcessFunction implements CheckpointedFunction. And I have initialized broadcast state in public void initializeState(FunctionInitializationContext context) method, it seems working fine.

Here is the doc I followed: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction

Thanks a lot for your help!
Eleanore

On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <[hidden email]<mailto:[hidden email]>> wrote:
Hi Yun,

Thanks for the response, I have checked official document, and I have referred this example to write the broadcast state to a savepoint.

My question is: I can use state processor api to read back the savepoint into a dataSet, but how can I use the dataSet as the initial value for the broadcast state in the BroadcastProcessFunction.

Thanks a lot!

Eleanore

On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <[hidden email]<mailto:[hidden email]>> wrote:
Hi Yi

Can the official doc of writing broad cast state [1] satisfies your request?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1

Best
Yun Tang
________________________________
From: Jin Yi <[hidden email]<mailto:[hidden email]>>
Sent: Thursday, January 23, 2020 8:12
To: user <[hidden email]<mailto:[hidden email]>>; [hidden email]<mailto:[hidden email]> <[hidden email]<mailto:[hidden email]>>
Subject: [State Processor API] how to convert savepoint back to broadcast state

Hi there,

I would like to read the savepoints (for broadcast state) back into the broadcast state, how should I do it?


// load the existingSavepoint;
ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend());

// read state from existing savepoint
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

// TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState?

Thanks!

Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: [State Processor API] how to convert savepoint back to broadcast state

Eleanore Jin
Hi Yun,

Thanks for the suggestion!

Best
Eleanore

On Mon, Jan 27, 2020 at 1:54 AM Yun Tang <[hidden email]> wrote:

> Hi Yi
>
> Glad to know you have already resolved it. State process API would use
> data stream API instead of data set API in the future [1].
>
> Besides, you could also follow the guide in "the brodcast state
> pattern"[2]
>
> // a map descriptor to store the name of the rule (string) and the rule itself.MapStateDescriptor<String, Rule> stateDescriptor = new MapStateDescriptor<>(
> "*RulesBroadcastState*",
> BasicTypeInfo.STRING_TYPE_INFO,
> TypeInformation.of(new TypeHint<Rule>() {}));
> // broadcast the rules and create the broadcast stateBroadcastStream<Rule> broadcastStream = ruleStream
>                         .broadcast(stateDescriptor);
>
> colorPartitionedStream
>                  .connect(broadcastStream)
>                  .process(
>
>                      new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
>                          // my matching logic
>                      }
>                  ).uid("*your-uid*");
>
> Make sure the uid and the state-name are the same with those in your
> savepoint, the CoBroadcastWithKeyedOperator would initialize the broadcast
> state when opening. [3]
>
>
> [1]
> https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
> [3]
> https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101
>
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Jin Yi <[hidden email]>
> *Sent:* Monday, January 27, 2020 14:50
> *To:* Yun Tang <[hidden email]>
> *Cc:* user <[hidden email]>; [hidden email] <
> [hidden email]>
> *Subject:* Re: [State Processor API] how to convert savepoint back to
> broadcast state
>
> Hi Yun,
>
> After search around in the documentation, I tried extends
> BroadcastProcessFunction implements CheckpointedFunction. And I have
> initialized broadcast state in public void initializeState(FunctionInitializationContext
> context) method, it seems working fine.
>
> Here is the doc I followed:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction
>
> Thanks a lot for your help!
> Eleanore
>
> On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <[hidden email]> wrote:
>
> Hi Yun,
>
> Thanks for the response, I have checked official document, and I have
> referred this example to write the broadcast state to a savepoint.
>
> My question is: I can use state processor api to read back the savepoint
> into a dataSet, but how can I use the dataSet as the initial value for the
> broadcast state in the BroadcastProcessFunction.
>
> Thanks a lot!
>
> Eleanore
>
> On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <[hidden email]> wrote:
>
> Hi Yi
>
> Can the official doc of writing broad cast state [1] satisfies your
> request?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1
>
> Best
> Yun Tang
> ------------------------------
> *From:* Jin Yi <[hidden email]>
> *Sent:* Thursday, January 23, 2020 8:12
> *To:* user <[hidden email]>; [hidden email] <
> [hidden email]>
> *Subject:* [State Processor API] how to convert savepoint back to
> broadcast state
>
> Hi there,
>
> I would like to read the savepoints (for broadcast state) back into the
> broadcast state, how should I do it?
>
> // load the existingSavepoint;
> ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend());
>
> // read state from existing savepoint
> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
>
> // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState?
>
> Thanks!
>
> Eleanore
>
>