Login  Register

Re: [State Processor API] how to convert savepoint back to broadcast state

Posted by Eleanore Jin on Jan 27, 2020; 5:12pm
URL: http://apache-flink.370.s1.nabble.com/State-Processor-API-how-to-convert-savepoint-back-to-broadcast-state-tp1640p1651.html

Hi Yun,

Thanks for the suggestion!

Best
Eleanore

On Mon, Jan 27, 2020 at 1:54 AM Yun Tang <[hidden email]> wrote:

> Hi Yi
>
> Glad to know you have already resolved it. State process API would use
> data stream API instead of data set API in the future [1].
>
> Besides, you could also follow the guide in "the brodcast state
> pattern"[2]
>
> // a map descriptor to store the name of the rule (string) and the rule itself.MapStateDescriptor<String, Rule> stateDescriptor = new MapStateDescriptor<>(
> "*RulesBroadcastState*",
> BasicTypeInfo.STRING_TYPE_INFO,
> TypeInformation.of(new TypeHint<Rule>() {}));
> // broadcast the rules and create the broadcast stateBroadcastStream<Rule> broadcastStream = ruleStream
>                         .broadcast(stateDescriptor);
>
> colorPartitionedStream
>                  .connect(broadcastStream)
>                  .process(
>
>                      new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
>                          // my matching logic
>                      }
>                  ).uid("*your-uid*");
>
> Make sure the uid and the state-name are the same with those in your
> savepoint, the CoBroadcastWithKeyedOperator would initialize the broadcast
> state when opening. [3]
>
>
> [1]
> https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
> [3]
> https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101
>
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Jin Yi <[hidden email]>
> *Sent:* Monday, January 27, 2020 14:50
> *To:* Yun Tang <[hidden email]>
> *Cc:* user <[hidden email]>; [hidden email] <
> [hidden email]>
> *Subject:* Re: [State Processor API] how to convert savepoint back to
> broadcast state
>
> Hi Yun,
>
> After search around in the documentation, I tried extends
> BroadcastProcessFunction implements CheckpointedFunction. And I have
> initialized broadcast state in public void initializeState(FunctionInitializationContext
> context) method, it seems working fine.
>
> Here is the doc I followed:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction
>
> Thanks a lot for your help!
> Eleanore
>
> On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <[hidden email]> wrote:
>
> Hi Yun,
>
> Thanks for the response, I have checked official document, and I have
> referred this example to write the broadcast state to a savepoint.
>
> My question is: I can use state processor api to read back the savepoint
> into a dataSet, but how can I use the dataSet as the initial value for the
> broadcast state in the BroadcastProcessFunction.
>
> Thanks a lot!
>
> Eleanore
>
> On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <[hidden email]> wrote:
>
> Hi Yi
>
> Can the official doc of writing broad cast state [1] satisfies your
> request?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1
>
> Best
> Yun Tang
> ------------------------------
> *From:* Jin Yi <[hidden email]>
> *Sent:* Thursday, January 23, 2020 8:12
> *To:* user <[hidden email]>; [hidden email] <
> [hidden email]>
> *Subject:* [State Processor API] how to convert savepoint back to
> broadcast state
>
> Hi there,
>
> I would like to read the savepoints (for broadcast state) back into the
> broadcast state, how should I do it?
>
> // load the existingSavepoint;
> ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend());
>
> // read state from existing savepoint
> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
>
> // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState?
>
> Thanks!
>
> Eleanore
>
>