task传输数据时反序列化失败

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

task传输数据时反序列化失败

shizk233
Hi all,

请教一下反序列化的问题,我有一个KeyedCoProcessFunction<T>,输入是log流和rule流。
数据流如下:
logSource
.connect(ruleSource)
.keyby(...)
.process(My KeyedCoProcessFunction<>)
.keyby(...)
.print()

其中CoProcess函数中有两个MapState分别做log缓存和rule缓存。
结构为Map<Long,List<T>> logState,Map<String, Tuple2<Rule, Long>> ruleState.
T在实例化函数时确定,为MyLog类型。

运行时遇到了如下错误,看样子似乎是在下游算子反序列化数据时的异常,想请教一下这个错误产生的原因是什么?

补充说明:Rule使用POJO 序列化器会产生该异常,但使用Kyro序列化器时则不就产生。

Caused by: java.lang.IllegalArgumentException: Can not set java.lang.String
field org.example.vo.Rule.dimension to org.example.vo.MyLog
at
sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
at
sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
at
sun.reflect.UnsafeFieldAccessorImpl.ensureObj(UnsafeFieldAccessorImpl.java:58)
at
sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:75)
at java.lang.reflect.Field.set(Field.java:764)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:390)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
Reply | Threaded
Open this post in threaded view
|

Re: task传输数据时反序列化失败

Congxian Qiu
Hi
    从栈看应该是 deserialize 的时候出错了,另外 kryo 可以,Pojo 不行,能否检查下,是否满足 POJO  的一些要求[1]呢?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/schema_evolution.html#pojo-types
Best,
Congxian


shizk233 <[hidden email]> 于2020年8月18日周二 下午4:09写道:

> Hi all,
>
> 请教一下反序列化的问题,我有一个KeyedCoProcessFunction<T>,输入是log流和rule流。
> 数据流如下:
> logSource
> .connect(ruleSource)
> .keyby(...)
> .process(My KeyedCoProcessFunction<>)
> .keyby(...)
> .print()
>
> 其中CoProcess函数中有两个MapState分别做log缓存和rule缓存。
> 结构为Map<Long,List<T>> logState,Map<String, Tuple2<Rule, Long>> ruleState.
> T在实例化函数时确定,为MyLog类型。
>
> 运行时遇到了如下错误,看样子似乎是在下游算子反序列化数据时的异常,想请教一下这个错误产生的原因是什么?
>
> 补充说明:Rule使用POJO 序列化器会产生该异常,但使用Kyro序列化器时则不就产生。
>
> Caused by: java.lang.IllegalArgumentException: Can not set java.lang.String
> field org.example.vo.Rule.dimension to org.example.vo.MyLog
> at
>
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
> at
>
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
> at
>
> sun.reflect.UnsafeFieldAccessorImpl.ensureObj(UnsafeFieldAccessorImpl.java:58)
> at
>
> sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:75)
> at java.lang.reflect.Field.set(Field.java:764)
> at
>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
> at
>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:390)
> at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
> at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
> at
>
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> at
>
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at
>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
>
Reply | Threaded
Open this post in threaded view
|

Re: task传输数据时反序列化失败

shizk233
关于kyro和pojo,我是指通过调整lombok的@Value/@Data注解改变数据对象的模式,从而使用不同的序列化器。
在@Data注解下,满足pojo序列化器要求并使用,但会遭遇异常。

目前发现,需要在MapState中明确指定List<T>的数据类型,pojo序列化器才能正常得到结构,怀疑是MapStateDescriptor中类型信息提取错误导致的。
但奇怪的是,kryo序列化器却没有产生该异常。

我做了个问题复现的demo[1],有时间的话可以查看一下。

[1]https://github.com/wangwangdaxian/flink-bug-replay

Congxian Qiu <[hidden email]> 于2020年8月19日周三 下午1:58写道:

> Hi
>     从栈看应该是 deserialize 的时候出错了,另外 kryo 可以,Pojo 不行,能否检查下,是否满足 POJO
> 的一些要求[1]呢?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/schema_evolution.html#pojo-types
> Best,
> Congxian
>
>
> shizk233 <[hidden email]> 于2020年8月18日周二 下午4:09写道:
>
> > Hi all,
> >
> > 请教一下反序列化的问题,我有一个KeyedCoProcessFunction<T>,输入是log流和rule流。
> > 数据流如下:
> > logSource
> > .connect(ruleSource)
> > .keyby(...)
> > .process(My KeyedCoProcessFunction<>)
> > .keyby(...)
> > .print()
> >
> > 其中CoProcess函数中有两个MapState分别做log缓存和rule缓存。
> > 结构为Map<Long,List<T>> logState,Map<String, Tuple2<Rule, Long>> ruleState.
> > T在实例化函数时确定,为MyLog类型。
> >
> > 运行时遇到了如下错误,看样子似乎是在下游算子反序列化数据时的异常,想请教一下这个错误产生的原因是什么?
> >
> > 补充说明:Rule使用POJO 序列化器会产生该异常,但使用Kyro序列化器时则不就产生。
> >
> > Caused by: java.lang.IllegalArgumentException: Can not set
> java.lang.String
> > field org.example.vo.Rule.dimension to org.example.vo.MyLog
> > at
> >
> >
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
> > at
> >
> >
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
> > at
> >
> >
> sun.reflect.UnsafeFieldAccessorImpl.ensureObj(UnsafeFieldAccessorImpl.java:58)
> > at
> >
> >
> sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:75)
> > at java.lang.reflect.Field.set(Field.java:764)
> > at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
> > at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:390)
> > at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
> > at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
> > at
> >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> > at
> >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> > at
> >
> >
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> > at
> > org.apache.flink.runtime.io
> >
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
> > at
> > org.apache.flink.streaming.runtime.io
> > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
> >
>