检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

nobleyd
如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
报错堆栈如下,关键错误是什么无法访问public修饰的成员?

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:235)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:248)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:400)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5/
30) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.keyedStatedBackend(
StreamTaskStateInitializerImpl.java:316)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:155)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore heap backend
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
.build(HeapKeyedStateBackendBuilder.java:116)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend
.createKeyedStateBackend(FsStateBackend.java:540)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend
.createKeyedStateBackend(FsStateBackend.java:100)
    at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(
StateBackend.java:178)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
StreamTaskStateInitializerImpl.java:299)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: com.google.common.hash.LongAdder
Serialization trace:
bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
bits (com.google.common.hash.BloomFilter)
    at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
136)
    at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
FieldSerializer.java:547)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:523)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:113)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:143)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
    at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
    at org.apache.flink.runtime.state.
KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(
KeyGroupPartitioner.java:297)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation
.readKeyGroupStateData(HeapRestoreOperation.java:299)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation
.readStateHandleStateData(HeapRestoreOperation.java:260)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
HeapRestoreOperation.java:160)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
.build(HeapKeyedStateBackendBuilder.java:114)
    ... 17 more
Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
Instantiators$$anonfun$normalJava$1 can not access a member of class
com.google.common.hash.LongAdder with modifiers "public"
    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
    at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(
AccessibleObject.java:296)
    at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:
288)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
    at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase
.scala:170)
    at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
133)
    ... 37 more
Reply | Threaded
Open this post in threaded view
|

Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

nobleyd
这个问题有人知道吗?
我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。

赵一旦 <[hidden email]> 于2021年1月28日周四 下午6:03写道:

> 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> 报错堆栈如下,关键错误是什么无法访问public修饰的成员?
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:235)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:248)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:400)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:507)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:47)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:501)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:531)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5
> /30) from any of the 1 provided restore options.
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:316)
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:155)
>     ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:116)
>     at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:540)
>     at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:100)
>     at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:178)
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:299)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:121)
>     ... 11 more
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: com.google.common.hash.LongAdder
> Serialization trace:
> bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
> bits (com.google.common.hash.BloomFilter)
>     at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 136)
>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
> FieldSerializer.java:547)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:523)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:113)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:143)
>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
>     at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
>     at org.apache.flink.runtime.state.
> KeyGroupPartitioner$PartitioningResultKeyGroupReader
> .readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .readKeyGroupStateData(HeapRestoreOperation.java:299)
>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .readStateHandleStateData(HeapRestoreOperation.java:260)
>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
> HeapRestoreOperation.java:160)
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:114)
>     ... 17 more
> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
> Instantiators$$anonfun$normalJava$1 can not access a member of class
> com.google.common.hash.LongAdder with modifiers "public"
>     at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
>     at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(
> AccessibleObject.java:296)
>     at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject
> .java:288)
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
>     at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(
> KryoBase.scala:170)
>     at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 133)
>     ... 37 more
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

Kezhu Wang
自定义 state  的 type-serializer,可以尝试下 SimpleVersionedSerializer/avro/protobuf,
etc.

复杂状态应该尽量避免使用依赖反射的 serializer,java Serializable 同理,这些都不好做 state upgrade。

On January 31, 2021 at 11:29:25, 赵一旦 ([hidden email]) wrote:

这个问题有人知道吗?
我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。


赵一旦 <[hidden email]> 于2021年1月28日周四 下午6:03写道:

> 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> 报错堆栈如下,关键错误是什么无法访问public修饰的成员?
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:235)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:248)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:400)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:507)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:501)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:531)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5

> /30) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:316)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:155)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Failed

> when trying to restore heap backend
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:116)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:540)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:100)
> at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:178)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:299)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: com.google.common.hash.LongAdder
> Serialization trace:
> bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
> bits (com.google.common.hash.BloomFilter)
> at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 136)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
> FieldSerializer.java:547)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:523)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:113)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:143)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at org.apache.flink.runtime.state.
> KeyGroupPartitioner$PartitioningResultKeyGroupReader
> .readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .readKeyGroupStateData(HeapRestoreOperation.java:299)
> at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .readStateHandleStateData(HeapRestoreOperation.java:260)
> at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
> HeapRestoreOperation.java:160)
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:114)
> ... 17 more
> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
> Instantiators$$anonfun$normalJava$1 can not access a member of class
> com.google.common.hash.LongAdder with modifiers "public"
> at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
> at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(
> AccessibleObject.java:296)
> at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject
> .java:288)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
> at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(
> KryoBase.scala:170)
> at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 133)
> ... 37 more
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

nobleyd
@Kezhu Wang <[hidden email]> Hi.
最近序列化相关问题遇到好多,如上这个是因为LongAdder非public,这个简单覆盖倒是也能解决。

但是我还遇到好多关于kryo序列化问题,比如我停任务(stop -p)的时候,会在保存点成功的瞬间报错,如何开始进入restarting状态。
报的是kryo的错误:
2021-02-03 11:00:54
com.esotericsoftware.kryo.KryoException: Unable to find class: eU
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(
DefaultClassResolver.java:138)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:115)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: eU
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader
.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader
.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(
FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.
FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(
FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(
DefaultClassResolver.java:136)
    ... 22 more

目前看下来比较复杂,想问问你说的通过自定义type-serializer的实现。 我不是很清楚,基于
SimpleVersionedSerializer/avro/protobuf
的实现,哪个是可以通用直接都替换现有flink实现(比如类似json这种schema,不需要人工指定任何schema定义就可以完成序列化和反序列化的),哪个是适用于特殊对象特殊处理的(比如需要人工实现序列化和反序列化)。

以及有什么简单的示例吗。

Kezhu Wang <[hidden email]> 于2021年1月31日周日 下午3:09写道:

> 自定义 state  的 type-serializer,可以尝试下 SimpleVersionedSerializer/avro/protobuf,
> etc.
>
> 复杂状态应该尽量避免使用依赖反射的 serializer,java Serializable 同理,这些都不好做 state upgrade。
>
> On January 31, 2021 at 11:29:25, 赵一旦 ([hidden email]) wrote:
>
> 这个问题有人知道吗?
> 我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
>
> 毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。
>
>
> 赵一旦 <[hidden email]> 于2021年1月28日周四 下午6:03写道:
>
> > 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> > 报错堆栈如下,关键错误是什么无法访问public修饰的成员?
> >
> > java.lang.Exception: Exception while creating StreamOperatorStateContext.
> > at org.apache.flink.streaming.api.operators.
> > StreamTaskStateInitializerImpl.streamOperatorStateContext(
> > StreamTaskStateInitializerImpl.java:235)
> > at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > .initializeState(AbstractStreamOperator.java:248)
> > at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > .initializeStateAndOpenOperators(OperatorChain.java:400)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask
> > .lambda$beforeInvoke$2(StreamTask.java:507)
> > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> > .runThrowing(StreamTaskActionExecutor.java:47)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> > StreamTask.java:501)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> > StreamTask.java:531)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> > state backend for
> KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5
> > /30) from any of the 1 provided restore options.
> > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > .createAndRestore(BackendRestorerProcedure.java:135)
> > at org.apache.flink.streaming.api.operators.
> > StreamTaskStateInitializerImpl.keyedStatedBackend(
> > StreamTaskStateInitializerImpl.java:316)
> > at org.apache.flink.streaming.api.operators.
> > StreamTaskStateInitializerImpl.streamOperatorStateContext(
> > StreamTaskStateInitializerImpl.java:155)
> > ... 9 more
> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Failed
> > when trying to restore heap backend
> > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> > .build(HeapKeyedStateBackendBuilder.java:116)
> > at org.apache.flink.runtime.state.filesystem.FsStateBackend
> > .createKeyedStateBackend(FsStateBackend.java:540)
> > at org.apache.flink.runtime.state.filesystem.FsStateBackend
> > .createKeyedStateBackend(FsStateBackend.java:100)
> > at org.apache.flink.runtime.state.StateBackend
> > .createKeyedStateBackend(StateBackend.java:178)
> > at org.apache.flink.streaming.api.operators.
> > StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> > StreamTaskStateInitializerImpl.java:299)
> > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > .createAndRestore(BackendRestorerProcedure.java:121)
> > ... 11 more
> > Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> > instance of class: com.google.common.hash.LongAdder
> > Serialization trace:
> > bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
> > bits (com.google.common.hash.BloomFilter)
> > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> > 136)
> > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> > at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
> > FieldSerializer.java:547)
> > at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> > FieldSerializer.java:523)
> > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> > .java:106)
> > at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> > FieldSerializer.java:528)
> > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
> > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> > .java:113)
> > at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> > FieldSerializer.java:528)
> > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> > at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> > MapSerializer.java:143)
> > at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> > MapSerializer.java:21)
> > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> > .deserialize(KryoSerializer.java:346)
> > at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> > .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> > at org.apache.flink.runtime.state.
> > KeyGroupPartitioner$PartitioningResultKeyGroupReader
> > .readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> > at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> > .readKeyGroupStateData(HeapRestoreOperation.java:299)
> > at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> > .readStateHandleStateData(HeapRestoreOperation.java:260)
> > at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
> > HeapRestoreOperation.java:160)
> > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> > .build(HeapKeyedStateBackendBuilder.java:114)
> > ... 17 more
> > Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
> > Instantiators$$anonfun$normalJava$1 can not access a member of class
> > com.google.common.hash.LongAdder with modifiers "public"
> > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
> > at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(
> > AccessibleObject.java:296)
> > at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject
> > .java:288)
> > at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
> > at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(
> > KryoBase.scala:170)
> > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> > 133)
> > ... 37 more
> >
> >
>