Streaming File Sink的使用问题

classic Classic list List threaded Threaded
2 messages Options
cs
Reply | Threaded
Open this post in threaded view
|

Streaming File Sink的使用问题

cs
Streaming File Sink使用parquet avro格式进行bulk write,代码如下:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new FsStateBackend(new Path("file:///g:/checkpoint")));
Schema schema = new Schema.Parser().parse("{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
final DataStreamSource<GenericRecord&gt; source = env.addSource(new RichSourceFunction<GenericRecord&gt;() {
    Schema schema1;

    @Override
    public void open(Configuration parameters) throws Exception {
        schema1 = new Schema.Parser().parse("{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
    }

    @Override
    public void run(SourceContext<GenericRecord&gt; ctx) throws Exception {
        while (true) {
            Thread.sleep(2000);
            GenericRecord record = new GenericData.Record(schema1);
            record.put("name", "zhangsan");
            ctx.collect(record);
        }
    }

    @Override
    public void cancel() {

    }
});
final StreamingFileSink<GenericRecord&gt; sink = StreamingFileSink
        .forBulkFormat(new Path("file:///g:/tmp/streamsink"), ParquetAvroWriters.forGenericRecord(schema))
        .build();
source.addSink(sink);
env.execute();但是程序运行起来却报如下错误是什么原因呢com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at myflink.connector.StreamFileSinkConnector$1.run(StreamFileSinkConnector.java:37) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 22 more
Reply | Threaded
Open this post in threaded view
|

Re: Streaming File Sink的使用问题

Yun Gao
从报错来看,GenericRecord可能不能被序列化;感觉目前可以先用一个自定义的数据类型来传输


------------------------------------------------------------------
From:58683632 <[hidden email]>
Send Time:2020 Mar. 17 (Tue.) 13:33
To:user-zh <[hidden email]>
Subject:Streaming File Sink的使用问题

Streaming File Sink使用parquet avro格式进行bulk write,代码如下:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new FsStateBackend(new Path("file:///g:/checkpoint")));
Schema schema = new Schema.Parser().parse("{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
final DataStreamSource<GenericRecord&gt; source = env.addSource(new RichSourceFunction<GenericRecord&gt;() {
    Schema schema1;

    @Override
    public void open(Configuration parameters) throws Exception {
        schema1 = new Schema.Parser().parse("{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
    }

    @Override
    public void run(SourceContext<GenericRecord&gt; ctx) throws Exception {
        while (true) {
            Thread.sleep(2000);
            GenericRecord record = new GenericData.Record(schema1);
            record.put("name", "zhangsan");
            ctx.collect(record);
        }
    }

    @Override
    public void cancel() {

    }
});
final StreamingFileSink<GenericRecord&gt; sink = StreamingFileSink
        .forBulkFormat(new Path("file:///g:/tmp/streamsink"), ParquetAvroWriters.forGenericRecord(schema))
        .build();
source.addSink(sink);
env.execute();但是程序运行起来却报如下错误是什么原因呢com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record)  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)  at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)  at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)  at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)  at myflink.connector.StreamFileSinkConnector$1.run(StreamFileSinkConnector.java:37)  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.lang.UnsupportedOperationException  at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)  at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)  at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)  ... 22 more