Hi, 各位好:
我在使用state processor api创建新的包含kafka相关state的savepoint用来修改max parallelism时,创建成功后使用此savepoint来重启任务,发现抛出如下异常: {code} java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204) at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189) at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164) at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ... 15 more {code} 我的使用方式如下: @Override public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend, ParameterTool config) { // 加载未修改max parallelism的savepoint String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH); // 新的max parallelism int maxParallelism = config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM); try { ExistingSavepoint existingSavepoint = Savepoint.load(env, savepointPath, stateBackend); // 读取kafka相关state DataSet<Tuple2<KafkaTopicPartition, Long>> kafkaListState = existingSavepoint.readUnionState( OperatorUidAndNameConstants.KAFKA_SOURCE_UID, StateNameConstants.KAFKA_OFFSET_STATE_NAME, KafkaStateUtils.createTypeInformation(), KafkaStateUtils.createStateDescriptorSerializer(env.getConfig())); logger.info("Print kafka offset"); kafkaListState.print(); BootstrapTransformation<Tuple2<KafkaTopicPartition, Long>> kafkaTransformation = OperatorTransformation.bootstrapWith(kafkaListState).transform(new KafkaSourceBootstrapFunction()); Savepoint.create(stateBackend, maxParallelism) .withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, kafkaTransformation) .write(savepointOutputPath); } catch (IOException e) { logger.error("Savepoint load: " + e.getMessage()); e.printStackTrace(); } catch (Exception e) { logger.error("print state: " + e.getMessage()); e.printStackTrace(); } } // KafkaStateUtils.java public class KafkaStateUtils { /** * Creates state serializer for kafka topic partition to offset tuple. * Using of the explicit state serializer with KryoSerializer is needed because otherwise * users cannot use 'disableGenericTypes' properties with KafkaConsumer. * @param executionConfig * @return */ public static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateDescriptorSerializer( ExecutionConfig executionConfig) { // explicit serializer will keep the compatibility with GenericTypeInformation // and allow to disableGenericTypes for users TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{ new KryoSerializer<>(KafkaTopicPartition.class, executionConfig), LongSerializer.INSTANCE }; @SuppressWarnings("unchecked") Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class; return new TupleSerializer<>(tupleClass, fieldSerializers); } public static TypeInformation<Tuple2<KafkaTopicPartition, Long>> createTypeInformation() { return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {}); } } // KafkaSourceBootstrapFunction.java public class KafkaSourceBootstrapFunction extends StateBootstrapFunction<Tuple2<KafkaTopicPartition, Long>> { private ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; @Override public void processElement(Tuple2<KafkaTopicPartition, Long> value, Context context) throws Exception { unionOffsetStates.add(value); } @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { } @Override public void initializeState(FunctionInitializationContext context) throws Exception { unionOffsetStates = context.getOperatorStateStore().getUnionListState( new ListStateDescriptor<Tuple2<KafkaTopicPartition, Long>>(StateNameConstants.KAFKA_OFFSET_STATE_NAME, KafkaStateUtils.createStateDescriptorSerializer(getRuntimeContext().getExecutionConfig()))); } } 以上是我的使用方式,请问是否可以有人给出一点使用建议?远程debug后发现只有`org.apache.flink.state.api.output.TaggedOperatorSubtaskState`没有从新的savepoint中反序列化出来。 Best regard! clare |
Free forum by Nabble | Edit this page |