Hi there,
I would like to read the savepoints (for broadcast state) back into the broadcast state, how should I do it? // load the existingSavepoint; ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); // read state from existing savepoint dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState? Thanks! Eleanore |
Hi Yi
Can the official doc of writing broad cast state [1] satisfies your request? [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1 Best Yun Tang ________________________________ From: Jin Yi <[hidden email]> Sent: Thursday, January 23, 2020 8:12 To: user <[hidden email]>; [hidden email] <[hidden email]> Subject: [State Processor API] how to convert savepoint back to broadcast state Hi there, I would like to read the savepoints (for broadcast state) back into the broadcast state, how should I do it? // load the existingSavepoint; ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); // read state from existing savepoint dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState? Thanks! Eleanore |
Hi Yun,
Thanks for the response, I have checked official document, and I have referred this example to write the broadcast state to a savepoint. My question is: I can use state processor api to read back the savepoint into a dataSet, but how can I use the dataSet as the initial value for the broadcast state in the BroadcastProcessFunction. Thanks a lot! Eleanore On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <[hidden email]> wrote: > Hi Yi > > Can the official doc of writing broad cast state [1] satisfies your > request? > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1 > > Best > Yun Tang > ------------------------------ > *From:* Jin Yi <[hidden email]> > *Sent:* Thursday, January 23, 2020 8:12 > *To:* user <[hidden email]>; [hidden email] < > [hidden email]> > *Subject:* [State Processor API] how to convert savepoint back to > broadcast state > > Hi there, > > I would like to read the savepoints (for broadcast state) back into the > broadcast state, how should I do it? > > // load the existingSavepoint; > ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); > > // read state from existing savepoint > dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); > > // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState? > > Thanks! > > Eleanore > > |
Hi Yun,
After search around in the documentation, I tried extends BroadcastProcessFunction implements CheckpointedFunction. And I have initialized broadcast state in public void initializeState(FunctionInitializationContext context) method, it seems working fine. Here is the doc I followed: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction Thanks a lot for your help! Eleanore On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <[hidden email]> wrote: > Hi Yun, > > Thanks for the response, I have checked official document, and I have > referred this example to write the broadcast state to a savepoint. > > My question is: I can use state processor api to read back the savepoint > into a dataSet, but how can I use the dataSet as the initial value for the > broadcast state in the BroadcastProcessFunction. > > Thanks a lot! > > Eleanore > > On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <[hidden email]> wrote: > >> Hi Yi >> >> Can the official doc of writing broad cast state [1] satisfies your >> request? >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1 >> >> Best >> Yun Tang >> ------------------------------ >> *From:* Jin Yi <[hidden email]> >> *Sent:* Thursday, January 23, 2020 8:12 >> *To:* user <[hidden email]>; [hidden email] < >> [hidden email]> >> *Subject:* [State Processor API] how to convert savepoint back to >> broadcast state >> >> Hi there, >> >> I would like to read the savepoints (for broadcast state) back into the >> broadcast state, how should I do it? >> >> // load the existingSavepoint; >> ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); >> >> // read state from existing savepoint >> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); >> >> // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState? >> >> Thanks! >> >> Eleanore >> >> |
Hi Yi
Glad to know you have already resolved it. State process API would use data stream API instead of data set API in the future [1]. Besides, you could also follow the guide in "the brodcast state pattern"[2] // a map descriptor to store the name of the rule (string) and the rule itself. MapStateDescriptor<String, Rule> stateDescriptor = new MapStateDescriptor<>( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Rule>() {})); // broadcast the rules and create the broadcast state BroadcastStream<Rule> broadcastStream = ruleStream .broadcast(stateDescriptor); colorPartitionedStream .connect(broadcastStream) .process( new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() { // my matching logic } ).uid("your-uid"); Make sure the uid and the state-name are the same with those in your savepoint, the CoBroadcastWithKeyedOperator would initialize the broadcast state when opening. [3] [1] https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis [3] https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101 Best Yun Tang ________________________________ From: Jin Yi <[hidden email]> Sent: Monday, January 27, 2020 14:50 To: Yun Tang <[hidden email]> Cc: user <[hidden email]>; [hidden email] <[hidden email]> Subject: Re: [State Processor API] how to convert savepoint back to broadcast state Hi Yun, After search around in the documentation, I tried extends BroadcastProcessFunction implements CheckpointedFunction. And I have initialized broadcast state in public void initializeState(FunctionInitializationContext context) method, it seems working fine. Here is the doc I followed: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction Thanks a lot for your help! Eleanore On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <[hidden email]<mailto:[hidden email]>> wrote: Hi Yun, Thanks for the response, I have checked official document, and I have referred this example to write the broadcast state to a savepoint. My question is: I can use state processor api to read back the savepoint into a dataSet, but how can I use the dataSet as the initial value for the broadcast state in the BroadcastProcessFunction. Thanks a lot! Eleanore On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <[hidden email]<mailto:[hidden email]>> wrote: Hi Yi Can the official doc of writing broad cast state [1] satisfies your request? [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1 Best Yun Tang ________________________________ From: Jin Yi <[hidden email]<mailto:[hidden email]>> Sent: Thursday, January 23, 2020 8:12 To: user <[hidden email]<mailto:[hidden email]>>; [hidden email]<mailto:[hidden email]> <[hidden email]<mailto:[hidden email]>> Subject: [State Processor API] how to convert savepoint back to broadcast state Hi there, I would like to read the savepoints (for broadcast state) back into the broadcast state, how should I do it? // load the existingSavepoint; ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); // read state from existing savepoint dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState? Thanks! Eleanore |
Hi Yun,
Thanks for the suggestion! Best Eleanore On Mon, Jan 27, 2020 at 1:54 AM Yun Tang <[hidden email]> wrote: > Hi Yi > > Glad to know you have already resolved it. State process API would use > data stream API instead of data set API in the future [1]. > > Besides, you could also follow the guide in "the brodcast state > pattern"[2] > > // a map descriptor to store the name of the rule (string) and the rule itself.MapStateDescriptor<String, Rule> stateDescriptor = new MapStateDescriptor<>( > "*RulesBroadcastState*", > BasicTypeInfo.STRING_TYPE_INFO, > TypeInformation.of(new TypeHint<Rule>() {})); > // broadcast the rules and create the broadcast stateBroadcastStream<Rule> broadcastStream = ruleStream > .broadcast(stateDescriptor); > > colorPartitionedStream > .connect(broadcastStream) > .process( > > new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() { > // my matching logic > } > ).uid("*your-uid*"); > > Make sure the uid and the state-name are the same with those in your > savepoint, the CoBroadcastWithKeyedOperator would initialize the broadcast > state when opening. [3] > > > [1] > https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis > [3] > https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101 > > > Best > Yun Tang > > ------------------------------ > *From:* Jin Yi <[hidden email]> > *Sent:* Monday, January 27, 2020 14:50 > *To:* Yun Tang <[hidden email]> > *Cc:* user <[hidden email]>; [hidden email] < > [hidden email]> > *Subject:* Re: [State Processor API] how to convert savepoint back to > broadcast state > > Hi Yun, > > After search around in the documentation, I tried extends > BroadcastProcessFunction implements CheckpointedFunction. And I have > initialized broadcast state in public void initializeState(FunctionInitializationContext > context) method, it seems working fine. > > Here is the doc I followed: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction > > Thanks a lot for your help! > Eleanore > > On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <[hidden email]> wrote: > > Hi Yun, > > Thanks for the response, I have checked official document, and I have > referred this example to write the broadcast state to a savepoint. > > My question is: I can use state processor api to read back the savepoint > into a dataSet, but how can I use the dataSet as the initial value for the > broadcast state in the BroadcastProcessFunction. > > Thanks a lot! > > Eleanore > > On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <[hidden email]> wrote: > > Hi Yi > > Can the official doc of writing broad cast state [1] satisfies your > request? > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1 > > Best > Yun Tang > ------------------------------ > *From:* Jin Yi <[hidden email]> > *Sent:* Thursday, January 23, 2020 8:12 > *To:* user <[hidden email]>; [hidden email] < > [hidden email]> > *Subject:* [State Processor API] how to convert savepoint back to > broadcast state > > Hi there, > > I would like to read the savepoints (for broadcast state) back into the > broadcast state, how should I do it? > > // load the existingSavepoint; > ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); > > // read state from existing savepoint > dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); > > // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState? > > Thanks! > > Eleanore > > |
Free forum by Nabble | Edit this page |