Hi all,
请教一下反序列化的问题,我有一个KeyedCoProcessFunction<T>,输入是log流和rule流。 数据流如下: logSource .connect(ruleSource) .keyby(...) .process(My KeyedCoProcessFunction<>) .keyby(...) .print() 其中CoProcess函数中有两个MapState分别做log缓存和rule缓存。 结构为Map<Long,List<T>> logState,Map<String, Tuple2<Rule, Long>> ruleState. T在实例化函数时确定,为MyLog类型。 运行时遇到了如下错误,看样子似乎是在下游算子反序列化数据时的异常,想请教一下这个错误产生的原因是什么? 补充说明:Rule使用POJO 序列化器会产生该异常,但使用Kyro序列化器时则不就产生。 Caused by: java.lang.IllegalArgumentException: Can not set java.lang.String field org.example.vo.Rule.dimension to org.example.vo.MyLog at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167) at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171) at sun.reflect.UnsafeFieldAccessorImpl.ensureObj(UnsafeFieldAccessorImpl.java:58) at sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:75) at java.lang.reflect.Field.set(Field.java:764) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:390) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121) |
Hi
从栈看应该是 deserialize 的时候出错了,另外 kryo 可以,Pojo 不行,能否检查下,是否满足 POJO 的一些要求[1]呢? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/schema_evolution.html#pojo-types Best, Congxian shizk233 <[hidden email]> 于2020年8月18日周二 下午4:09写道: > Hi all, > > 请教一下反序列化的问题,我有一个KeyedCoProcessFunction<T>,输入是log流和rule流。 > 数据流如下: > logSource > .connect(ruleSource) > .keyby(...) > .process(My KeyedCoProcessFunction<>) > .keyby(...) > .print() > > 其中CoProcess函数中有两个MapState分别做log缓存和rule缓存。 > 结构为Map<Long,List<T>> logState,Map<String, Tuple2<Rule, Long>> ruleState. > T在实例化函数时确定,为MyLog类型。 > > 运行时遇到了如下错误,看样子似乎是在下游算子反序列化数据时的异常,想请教一下这个错误产生的原因是什么? > > 补充说明:Rule使用POJO 序列化器会产生该异常,但使用Kyro序列化器时则不就产生。 > > Caused by: java.lang.IllegalArgumentException: Can not set java.lang.String > field org.example.vo.Rule.dimension to org.example.vo.MyLog > at > > sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167) > at > > sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171) > at > > sun.reflect.UnsafeFieldAccessorImpl.ensureObj(UnsafeFieldAccessorImpl.java:58) > at > > sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:75) > at java.lang.reflect.Field.set(Field.java:764) > at > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209) > at > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:390) > at > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) > at > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) > at > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) > at > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > at > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io > .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) > at > org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121) > |
关于kyro和pojo,我是指通过调整lombok的@Value/@Data注解改变数据对象的模式,从而使用不同的序列化器。
在@Data注解下,满足pojo序列化器要求并使用,但会遭遇异常。 目前发现,需要在MapState中明确指定List<T>的数据类型,pojo序列化器才能正常得到结构,怀疑是MapStateDescriptor中类型信息提取错误导致的。 但奇怪的是,kryo序列化器却没有产生该异常。 我做了个问题复现的demo[1],有时间的话可以查看一下。 [1]https://github.com/wangwangdaxian/flink-bug-replay Congxian Qiu <[hidden email]> 于2020年8月19日周三 下午1:58写道: > Hi > 从栈看应该是 deserialize 的时候出错了,另外 kryo 可以,Pojo 不行,能否检查下,是否满足 POJO > 的一些要求[1]呢? > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/schema_evolution.html#pojo-types > Best, > Congxian > > > shizk233 <[hidden email]> 于2020年8月18日周二 下午4:09写道: > > > Hi all, > > > > 请教一下反序列化的问题,我有一个KeyedCoProcessFunction<T>,输入是log流和rule流。 > > 数据流如下: > > logSource > > .connect(ruleSource) > > .keyby(...) > > .process(My KeyedCoProcessFunction<>) > > .keyby(...) > > .print() > > > > 其中CoProcess函数中有两个MapState分别做log缓存和rule缓存。 > > 结构为Map<Long,List<T>> logState,Map<String, Tuple2<Rule, Long>> ruleState. > > T在实例化函数时确定,为MyLog类型。 > > > > 运行时遇到了如下错误,看样子似乎是在下游算子反序列化数据时的异常,想请教一下这个错误产生的原因是什么? > > > > 补充说明:Rule使用POJO 序列化器会产生该异常,但使用Kyro序列化器时则不就产生。 > > > > Caused by: java.lang.IllegalArgumentException: Can not set > java.lang.String > > field org.example.vo.Rule.dimension to org.example.vo.MyLog > > at > > > > > sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167) > > at > > > > > sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171) > > at > > > > > sun.reflect.UnsafeFieldAccessorImpl.ensureObj(UnsafeFieldAccessorImpl.java:58) > > at > > > > > sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:75) > > at java.lang.reflect.Field.set(Field.java:764) > > at > > > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209) > > at > > > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:390) > > at > > > > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) > > at > > > > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) > > at > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) > > at > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > > at > > > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > > at > > org.apache.flink.runtime.io > > > .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) > > at > > org.apache.flink.streaming.runtime.io > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121) > > > |
Free forum by Nabble | Edit this page |