Hi there,
1. in my job, I have a broadcast stream, initially there is no savepoint can be used as bootstrap values for the broadcast stream states. BootstrapTransformation transform = OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction); Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM) .withOperator(OPERATOR_UID, transform) .write("file:///tmp/new_savepoints");*/ Question: bootstrapWith(dataSet) is required, normally, the dataSet comes from the old savepoint, in this case, I dont have one, how should I deal with it? Or it is must required? 2. As messages coming through broadcast stream, the state gets updated 3. I would like to periodically save the broadcast state to a file via savepoints Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM) .withOperator(OPERATOR_UID, transform) .write("file:///tmp/new_savepoints"); 4. when the job gets cancelled, and next time when re-start the job, the broadcast initial state can be loaded from the previous savepoint. ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend()); dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); Question: now assume I got the old state as dataSet, how can I use it in the BroadcastProcessFunction as the initial state of the broadcast state? Thanks a lot for the help! Eleanore |
Hi Seth,
Thanks for the prompt response! Regarding my second question, once I have converted the existing savepoint to dataset, how can I convert the dataset into BroadcastState? For example, in my BroadcastProcessFunction: @Override public void processBroadcastElement(String key, Context context, Collector<JsonNode> collector) throws Exception { // Todo how to add existing BroadcastState from savepoint beforehand? BroadcastState<String, String> broadcastState = context.getBroadcastState(keySetStateDescriptor); broadcastState.put(key, key); } Thanks a lot! Eleanore On Tue, Jan 21, 2020 at 7:12 AM Seth Wiesman <[hidden email]> wrote: > Hi Eleanore, > > Bootstrap data is not required to come from an existing savepoint. It can > come from any DataSet which could be backed by a file, database, or any > other system. The state processor api is also not a tool you are going to > use between every start and stop of your job. It is just to bootstrap the > initial state of your application. After that, you will use savepoints to > carry over the current state of your applications between runs. > > > > On Mon, Jan 20, 2020 at 6:07 PM Jin Yi <[hidden email]> wrote: > >> Hi there, >> >> 1. in my job, I have a broadcast stream, initially there is no savepoint >> can be used as bootstrap values for the broadcast stream states. >> BootstrapTransformation transform = >> OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction); >> >> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM) >> .withOperator(OPERATOR_UID, transform) >> .write("file:///tmp/new_savepoints");*/ >> >> Question: bootstrapWith(dataSet) is required, normally, the dataSet comes >> from the old savepoint, in this case, I dont have one, how should I deal >> with it? Or it is must required? >> >> 2. As messages coming through broadcast stream, the state gets updated >> >> 3. I would like to periodically save the broadcast state to a file via >> savepoints >> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM) >> .withOperator(OPERATOR_UID, transform) >> .write("file:///tmp/new_savepoints"); >> >> 4. when the job gets cancelled, and next time when re-start the job, the >> broadcast initial state can be loaded from the previous savepoint. >> >> ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend()); >> >> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); >> >> Question: now assume I got the old state as dataSet, how can I use it in the BroadcastProcessFunction as the initial state of the broadcast state? >> >> Thanks a lot for the help! >> >> Eleanore >> > > > -- > > Seth Wiesman | Solutions Architect > > +1 314 387 1463 > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > |
Free forum by Nabble | Edit this page |