TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api

冰已融化
我在使用state processor api创建新的包含kafka相关state的savepoint用来修改max parallelism时,创建成功后使用此savepoint来重启任务,发现抛出如下异常:
 java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
    at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
    at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
    at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
    at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
    at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 15 more


我的使用方式如下:
@Override
 public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend,
 ParameterTool config) {
    // 加载未修改max parallelism的savepoint
     String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
   // 新的max parallelism
     int maxParallelism = config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM);
    
     try {
         ExistingSavepoint existingSavepoint = Savepoint.load(env, savepointPath, stateBackend);
         
        // 读取kafka相关state
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;DataSet<Tuple2<KafkaTopicPartition, Long&gt;&gt; kafkaListState = existingSavepoint.readUnionState(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;OperatorUidAndNameConstants.KAFKA_SOURCE_UID, StateNameConstants.KAFKA_OFFSET_STATE_NAME,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;KafkaStateUtils.createTypeInformation(),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;KafkaStateUtils.createStateDescriptorSerializer(env.getConfig()));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;logger.info("Print kafka offset");
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;kafkaListState.print();
&nbsp; &nbsp; &nbsp; &nbsp; BootstrapTransformation<Tuple2<KafkaTopicPartition, Long&gt;&gt; kafkaTransformation = OperatorTransformation.bootstrapWith(kafkaListState).transform(new KafkaSourceBootstrapFunction());


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Savepoint.create(stateBackend, maxParallelism)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, kafkaTransformation)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.write(savepointOutputPath);
&nbsp; &nbsp; &nbsp;} catch (IOException e) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logger.error("Savepoint load: " + e.getMessage());
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();
&nbsp; &nbsp; } catch (Exception e) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;logger.error("print state: " + e.getMessage());
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
&nbsp; &nbsp; }
&nbsp;}


&nbsp;


// KafkaStateUtils.java


public class KafkaStateUtils {
&nbsp; &nbsp; /**
&nbsp; &nbsp; &nbsp;* Creates state serializer for kafka topic partition to offset tuple.
&nbsp; &nbsp; &nbsp;* Using of the explicit state serializer with KryoSerializer is needed because otherwise
&nbsp; &nbsp; &nbsp;* users cannot use 'disableGenericTypes' properties with KafkaConsumer.
&nbsp; &nbsp; &nbsp;* @param executionConfig
&nbsp; &nbsp; &nbsp;* @return
&nbsp; &nbsp; &nbsp;*/
&nbsp; &nbsp; &nbsp;public static TupleSerializer<Tuple2<KafkaTopicPartition, Long&gt;&gt; createStateDescriptorSerializer(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ExecutionConfig executionConfig) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// explicit serializer will keep the compatibility with GenericTypeInformation
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// and allow to disableGenericTypes for users
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;TypeSerializer<?&gt;[] fieldSerializers = new TypeSerializer<?&gt;[]{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;new KryoSerializer<&gt;(KafkaTopicPartition.class, executionConfig),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;LongSerializer.INSTANCE
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;};
&nbsp; &nbsp; &nbsp; &nbsp; @SuppressWarnings("unchecked")
&nbsp; &nbsp; &nbsp; &nbsp; Class<Tuple2<KafkaTopicPartition, Long&gt;&gt; tupleClass = (Class<Tuple2<KafkaTopicPartition, Long&gt;&gt;) (Class<?&gt;) Tuple2.class;
&nbsp; &nbsp; &nbsp; &nbsp; return new TupleSerializer<&gt;(tupleClass, fieldSerializers);
&nbsp; &nbsp; &nbsp;}




&nbsp; &nbsp; &nbsp;public static TypeInformation<Tuple2<KafkaTopicPartition, Long&gt;&gt; createTypeInformation() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long&gt;&gt;() {});
&nbsp; &nbsp; &nbsp;}
}


// KafkaSourceBootstrapFunction.java
public class KafkaSourceBootstrapFunction extends StateBootstrapFunction<Tuple2<KafkaTopicPartition, Long&gt;&gt; {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;private ListState<Tuple2<KafkaTopicPartition, Long&gt;&gt; unionOffsetStates;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void processElement(Tuple2<KafkaTopicPartition, Long&gt; value, Context context) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; unionOffsetStates.add(value);&nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;@Override
&nbsp; &nbsp; &nbsp; &nbsp; public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { }
&nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; public void initializeState(FunctionInitializationContext context) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; unionOffsetStates = context.getOperatorStateStore().getUnionListState(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;new ListStateDescriptor<Tuple2<KafkaTopicPartition, Long&gt;&gt;(StateNameConstants.KAFKA_OFFSET_STATE_NAME,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; KafkaStateUtils.createStateDescriptorSerializer(getRuntimeContext().getExecutionConfig())));
&nbsp; &nbsp; &nbsp; &nbsp; }
}




以上是我的使用方式,请问是否可以有人给出一点使用建议?远程debug后发现只有`org.apache.flink.state.api.output.TaggedOperatorSubtaskState`没有从新的savepoint中反序列化出来。

Best regard!
clare