Flink PB序列化疑惑

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink PB序列化疑惑

chanamper
    大家好,请教一下Flink序列化相关的问题。

DataStream<Record> windowCounts = env
              .addSource()
              .transform();            
           我flink任务程序中的对象为Record,Record下的Head字段为PB协议的数据,对Head类注册了PB序列化方式。
env.getConfig().registerTypeWithKryoSerializer(
        Head.class, ProtobufSerializer.class);
           任务运行时发现source->process->transform之间OperatorChain进行对象数据copy时,出现KryoException:[Serializer does not support copy: com.twitter.chill.protobuf.ProtobufSerializer]的异常信息。我的任务程序在processs和transform操作中均未进行PB协议类型的head字段改动,这种情况下我采用自定义序列化方式对head对象进行浅拷贝后,会有啥潜在影响吗?
public class HeadSerializer extends ProtobufSerializer {
@Override
public Message copy(Kryo kryo, Message original) {
return original;
}
}


env.getConfig().registerTypeWithKryoSerializer(
        Head.class, HeadSerializer.class);