|
大家好,请教一下Flink序列化相关的问题。
DataStream<Record> windowCounts = env
.addSource()
.transform();
我flink任务程序中的对象为Record,Record下的Head字段为PB协议的数据,对Head类注册了PB序列化方式。
env.getConfig().registerTypeWithKryoSerializer(
Head.class, ProtobufSerializer.class);
任务运行时发现source->process->transform之间OperatorChain进行对象数据copy时,出现KryoException:[Serializer does not support copy: com.twitter.chill.protobuf.ProtobufSerializer]的异常信息。我的任务程序在processs和transform操作中均未进行PB协议类型的head字段改动,这种情况下我采用自定义序列化方式对head对象进行浅拷贝后,会有啥潜在影响吗?
public class HeadSerializer extends ProtobufSerializer {
@Override
public Message copy(Kryo kryo, Message original) {
return original;
}
}
env.getConfig().registerTypeWithKryoSerializer(
Head.class, HeadSerializer.class);
|