如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
报错堆栈如下,关键错误是什么无法访问public修饰的成员? java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.streamOperatorStateContext( StreamTaskStateInitializerImpl.java:235) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain .initializeStateAndOpenOperators(OperatorChain.java:400) at org.apache.flink.streaming.runtime.tasks.StreamTask .lambda$beforeInvoke$2(StreamTask.java:507) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 .runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( StreamTask.java:501) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:531) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5/ 30) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.keyedStatedBackend( StreamTaskStateInitializerImpl.java:316) at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.streamOperatorStateContext( StreamTaskStateInitializerImpl.java:155) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder .build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend .createKeyedStateBackend(FsStateBackend.java:540) at org.apache.flink.runtime.state.filesystem.FsStateBackend .createKeyedStateBackend(FsStateBackend.java:100) at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend( StateBackend.java:178) at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1( StreamTaskStateInitializerImpl.java:299) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: com.google.common.hash.LongAdder Serialization trace: bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray) bits (com.google.common.hash.BloomFilter) at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: 136) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) at com.esotericsoftware.kryo.serializers.FieldSerializer.create( FieldSerializer.java:547) at com.esotericsoftware.kryo.serializers.FieldSerializer.read( FieldSerializer.java:523) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField .java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read( FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField .java:113) at com.esotericsoftware.kryo.serializers.FieldSerializer.read( FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read( MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read( MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer .deserialize(KryoSerializer.java:346) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) at org.apache.flink.runtime.state. KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup( KeyGroupPartitioner.java:297) at org.apache.flink.runtime.state.heap.HeapRestoreOperation .readKeyGroupStateData(HeapRestoreOperation.java:299) at org.apache.flink.runtime.state.heap.HeapRestoreOperation .readStateHandleStateData(HeapRestoreOperation.java:260) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore( HeapRestoreOperation.java:160) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder .build(HeapKeyedStateBackendBuilder.java:114) ... 17 more Caused by: java.lang.IllegalAccessException: Class com.twitter.chill. Instantiators$$anonfun$normalJava$1 can not access a member of class com.google.common.hash.LongAdder with modifiers "public" at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) at java.lang.reflect.AccessibleObject.slowCheckMemberAccess( AccessibleObject.java:296) at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java: 288) at java.lang.reflect.Constructor.newInstance(Constructor.java:413) at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase .scala:170) at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: 133) ... 37 more |
这个问题有人知道吗?
我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。 毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。 赵一旦 <[hidden email]> 于2021年1月28日周四 下午6:03写道: > 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。 > 报错堆栈如下,关键错误是什么无法访问public修饰的成员? > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:235) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .initializeState(AbstractStreamOperator.java:248) > at org.apache.flink.streaming.runtime.tasks.OperatorChain > .initializeStateAndOpenOperators(OperatorChain.java:400) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$beforeInvoke$2(StreamTask.java:507) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .runThrowing(StreamTaskActionExecutor.java:47) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > StreamTask.java:501) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:531) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5 > /30) from any of the 1 provided restore options. > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:135) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.keyedStatedBackend( > StreamTaskStateInitializerImpl.java:316) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:155) > ... 9 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:116) > at org.apache.flink.runtime.state.filesystem.FsStateBackend > .createKeyedStateBackend(FsStateBackend.java:540) > at org.apache.flink.runtime.state.filesystem.FsStateBackend > .createKeyedStateBackend(FsStateBackend.java:100) > at org.apache.flink.runtime.state.StateBackend > .createKeyedStateBackend(StateBackend.java:178) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1( > StreamTaskStateInitializerImpl.java:299) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:121) > ... 11 more > Caused by: com.esotericsoftware.kryo.KryoException: Error constructing > instance of class: com.google.common.hash.LongAdder > Serialization trace: > bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray) > bits (com.google.common.hash.BloomFilter) > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: > 136) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) > at com.esotericsoftware.kryo.serializers.FieldSerializer.create( > FieldSerializer.java:547) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:523) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:106) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:113) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > MapSerializer.java:143) > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer > .deserialize(KryoSerializer.java:346) > at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders > .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) > at org.apache.flink.runtime.state. > KeyGroupPartitioner$PartitioningResultKeyGroupReader > .readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > .readKeyGroupStateData(HeapRestoreOperation.java:299) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > .readStateHandleStateData(HeapRestoreOperation.java:260) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore( > HeapRestoreOperation.java:160) > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:114) > ... 17 more > Caused by: java.lang.IllegalAccessException: Class com.twitter.chill. > Instantiators$$anonfun$normalJava$1 can not access a member of class > com.google.common.hash.LongAdder with modifiers "public" > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) > at java.lang.reflect.AccessibleObject.slowCheckMemberAccess( > AccessibleObject.java:296) > at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject > .java:288) > at java.lang.reflect.Constructor.newInstance(Constructor.java:413) > at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply( > KryoBase.scala:170) > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: > 133) > ... 37 more > > |
自定义 state 的 type-serializer,可以尝试下 SimpleVersionedSerializer/avro/protobuf,
etc. 复杂状态应该尽量避免使用依赖反射的 serializer,java Serializable 同理,这些都不好做 state upgrade。 On January 31, 2021 at 11:29:25, 赵一旦 ([hidden email]) wrote: 这个问题有人知道吗? 我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。 毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。 赵一旦 <[hidden email]> 于2021年1月28日周四 下午6:03写道: > 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。 > 报错堆栈如下,关键错误是什么无法访问public修饰的成员? > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:235) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .initializeState(AbstractStreamOperator.java:248) > at org.apache.flink.streaming.runtime.tasks.OperatorChain > .initializeStateAndOpenOperators(OperatorChain.java:400) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$beforeInvoke$2(StreamTask.java:507) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .runThrowing(StreamTaskActionExecutor.java:47) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > StreamTask.java:501) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:531) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > /30) from any of the 1 provided restore options. > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:135) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.keyedStatedBackend( > StreamTaskStateInitializerImpl.java:316) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:155) > ... 9 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: > when trying to restore heap backend > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:116) > at org.apache.flink.runtime.state.filesystem.FsStateBackend > .createKeyedStateBackend(FsStateBackend.java:540) > at org.apache.flink.runtime.state.filesystem.FsStateBackend > .createKeyedStateBackend(FsStateBackend.java:100) > at org.apache.flink.runtime.state.StateBackend > .createKeyedStateBackend(StateBackend.java:178) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1( > StreamTaskStateInitializerImpl.java:299) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:121) > ... 11 more > Caused by: com.esotericsoftware.kryo.KryoException: Error constructing > instance of class: com.google.common.hash.LongAdder > Serialization trace: > bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray) > bits (com.google.common.hash.BloomFilter) > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: > 136) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) > at com.esotericsoftware.kryo.serializers.FieldSerializer.create( > FieldSerializer.java:547) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:523) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:106) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:113) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > MapSerializer.java:143) > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer > .deserialize(KryoSerializer.java:346) > at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders > .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) > at org.apache.flink.runtime.state. > KeyGroupPartitioner$PartitioningResultKeyGroupReader > .readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > .readKeyGroupStateData(HeapRestoreOperation.java:299) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > .readStateHandleStateData(HeapRestoreOperation.java:260) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore( > HeapRestoreOperation.java:160) > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:114) > ... 17 more > Caused by: java.lang.IllegalAccessException: Class com.twitter.chill. > Instantiators$$anonfun$normalJava$1 can not access a member of class > com.google.common.hash.LongAdder with modifiers "public" > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) > at java.lang.reflect.AccessibleObject.slowCheckMemberAccess( > AccessibleObject.java:296) > at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject > .java:288) > at java.lang.reflect.Constructor.newInstance(Constructor.java:413) > at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply( > KryoBase.scala:170) > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: > 133) > ... 37 more > > |
@Kezhu Wang <[hidden email]> Hi.
最近序列化相关问题遇到好多,如上这个是因为LongAdder非public,这个简单覆盖倒是也能解决。 但是我还遇到好多关于kryo序列化问题,比如我停任务(stop -p)的时候,会在保存点成功的瞬间报错,如何开始进入restarting状态。 报的是kryo的错误: 2021-02-03 11:00:54 com.esotericsoftware.kryo.KryoException: Unable to find class: eU at com.esotericsoftware.kryo.util.DefaultClassResolver.readName( DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at com.esotericsoftware.kryo.serializers.MapSerializer.read( MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read( MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer .deserialize(KryoSerializer.java:346) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer .deserialize(PojoSerializer.java:411) at org.apache.flink.streaming.runtime.streamrecord. StreamElementSerializer.deserialize(StreamElementSerializer.java:202) at org.apache.flink.streaming.runtime.streamrecord. StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate .read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization. SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( SpillingAdaptiveSpanningRecordDeserializer.java:92) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:145) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor .processInput(StreamTwoInputProcessor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:372) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:575) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:539) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: eU at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at org.apache.flink.util.FlinkUserCodeClassLoader .loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) at org.apache.flink.util.ChildFirstClassLoader .loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass( FlinkUserCodeClassLoader.java:49) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at org.apache.flink.runtime.execution.librarycache. FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass( FlinkUserCodeClassLoaders.java:168) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName( DefaultClassResolver.java:136) ... 22 more 目前看下来比较复杂,想问问你说的通过自定义type-serializer的实现。 我不是很清楚,基于 SimpleVersionedSerializer/avro/protobuf 的实现,哪个是可以通用直接都替换现有flink实现(比如类似json这种schema,不需要人工指定任何schema定义就可以完成序列化和反序列化的),哪个是适用于特殊对象特殊处理的(比如需要人工实现序列化和反序列化)。 以及有什么简单的示例吗。 Kezhu Wang <[hidden email]> 于2021年1月31日周日 下午3:09写道: > 自定义 state 的 type-serializer,可以尝试下 SimpleVersionedSerializer/avro/protobuf, > etc. > > 复杂状态应该尽量避免使用依赖反射的 serializer,java Serializable 同理,这些都不好做 state upgrade。 > > On January 31, 2021 at 11:29:25, 赵一旦 ([hidden email]) wrote: > > 这个问题有人知道吗? > 我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。 > > 毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。 > > > 赵一旦 <[hidden email]> 于2021年1月28日周四 下午6:03写道: > > > 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。 > > 报错堆栈如下,关键错误是什么无法访问public修饰的成员? > > > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > > at org.apache.flink.streaming.api.operators. > > StreamTaskStateInitializerImpl.streamOperatorStateContext( > > StreamTaskStateInitializerImpl.java:235) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > > .initializeState(AbstractStreamOperator.java:248) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain > > .initializeStateAndOpenOperators(OperatorChain.java:400) > > at org.apache.flink.streaming.runtime.tasks.StreamTask > > .lambda$beforeInvoke$2(StreamTask.java:507) > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > > .runThrowing(StreamTaskActionExecutor.java:47) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > > StreamTask.java:501) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > > StreamTask.java:531) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > > state backend for > KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5 > > /30) from any of the 1 provided restore options. > > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > > .createAndRestore(BackendRestorerProcedure.java:135) > > at org.apache.flink.streaming.api.operators. > > StreamTaskStateInitializerImpl.keyedStatedBackend( > > StreamTaskStateInitializerImpl.java:316) > > at org.apache.flink.streaming.api.operators. > > StreamTaskStateInitializerImpl.streamOperatorStateContext( > > StreamTaskStateInitializerImpl.java:155) > > ... 9 more > > Caused by: org.apache.flink.runtime.state.BackendBuildingException: > Failed > > when trying to restore heap backend > > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > > .build(HeapKeyedStateBackendBuilder.java:116) > > at org.apache.flink.runtime.state.filesystem.FsStateBackend > > .createKeyedStateBackend(FsStateBackend.java:540) > > at org.apache.flink.runtime.state.filesystem.FsStateBackend > > .createKeyedStateBackend(FsStateBackend.java:100) > > at org.apache.flink.runtime.state.StateBackend > > .createKeyedStateBackend(StateBackend.java:178) > > at org.apache.flink.streaming.api.operators. > > StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1( > > StreamTaskStateInitializerImpl.java:299) > > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > > .attemptCreateAndRestore(BackendRestorerProcedure.java:142) > > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > > .createAndRestore(BackendRestorerProcedure.java:121) > > ... 11 more > > Caused by: com.esotericsoftware.kryo.KryoException: Error constructing > > instance of class: com.google.common.hash.LongAdder > > Serialization trace: > > bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray) > > bits (com.google.common.hash.BloomFilter) > > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: > > 136) > > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) > > at com.esotericsoftware.kryo.serializers.FieldSerializer.create( > > FieldSerializer.java:547) > > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > > FieldSerializer.java:523) > > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > > .java:106) > > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > > FieldSerializer.java:528) > > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730) > > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > > .java:113) > > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > > FieldSerializer.java:528) > > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > > MapSerializer.java:143) > > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > > MapSerializer.java:21) > > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer > > .deserialize(KryoSerializer.java:346) > > at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders > > .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) > > at org.apache.flink.runtime.state. > > KeyGroupPartitioner$PartitioningResultKeyGroupReader > > .readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > > .readKeyGroupStateData(HeapRestoreOperation.java:299) > > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > > .readStateHandleStateData(HeapRestoreOperation.java:260) > > at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore( > > HeapRestoreOperation.java:160) > > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > > .build(HeapKeyedStateBackendBuilder.java:114) > > ... 17 more > > Caused by: java.lang.IllegalAccessException: Class com.twitter.chill. > > Instantiators$$anonfun$normalJava$1 can not access a member of class > > com.google.common.hash.LongAdder with modifiers "public" > > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) > > at java.lang.reflect.AccessibleObject.slowCheckMemberAccess( > > AccessibleObject.java:296) > > at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject > > .java:288) > > at java.lang.reflect.Constructor.newInstance(Constructor.java:413) > > at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply( > > KryoBase.scala:170) > > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: > > 133) > > ... 37 more > > > > > |
Free forum by Nabble | Edit this page |