Hi all:
请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因? org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class invalid for deserialization at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150) at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) 大致逻辑如下, 我有2条流: 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka 代码如下: StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); List<KafkaInfo> kafkaSourceConfiguration = this.kafkaConfiguration.getSource0(); KafkaInfo kafkaSinkConfiguration = this.kafkaConfiguration.getSink(); RecordTransformOperator transformOperator = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE); RecordKeySelector keySelector = new RecordKeySelector(); RecordComputeOperator computeOperator = new RecordComputeOperator(); Properties sinkProperties = new Properties(); sinkProperties.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer()); FlinkKafkaProducer011 flinkKafkaProducer = new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties); List<SingleOutputStreamOperator<Tuple2<String, String>>> dataStreamList = new ArrayList<>(); for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) { Properties sourceProperties = new Properties(); sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer()); sourceProperties.setProperty("group.id", kafkaInfo.getGroupId()); sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord()); sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs()); String topicName = kafkaInfo.getTopicName(); FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer = new FlinkKafkaConsumer011(topicName, new KafkaDeserializer(), sourceProperties); SingleOutputStreamOperator<Tuple2<String, String>> singleOutputStreamOperator = streamExecutionEnvironment.addSource(flinkKafkaConsumer); dataStreamList.add(singleOutputStreamOperator); } DataStream<Tuple2<String, String>> unionDataStream = dataStreamList.get(0); for(int i = 1; i<dataStreamList.size(); i++) { unionDataStream = unionDataStream.union(dataStreamList.get(i)); } unionDataStream.flatMap(transformOperator) .keyBy(keySelector) .flatMap(computeOperator) .addSink(flinkKafkaProducer); RecordTransformOperator transformOperator1 = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM); Properties sinkProperties1 = new Properties(); sinkProperties1.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer()); FlinkKafkaProducer011 flinkKafkaProducer1 = new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties1); KafkaInfo kafkaInfo = this.kafkaConfiguration.getSource1().get(0); Properties sourceProperties = new Properties(); sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer()); sourceProperties.setProperty("group.id", kafkaInfo.getGroupId()); sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord()); sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs()); String topicName = kafkaInfo.getTopicName(); FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer = new FlinkKafkaConsumer011(topicName, new KafkaDeserializer(), sourceProperties); streamExecutionEnvironment .addSource(flinkKafkaConsumer) .flatMap(transformOperator1) .addSink(flinkKafkaProducer1); streamExecutionEnvironment.execute(); [hidden email] |
Hi,
根据报错内容,定位到你的代码在 at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。 Best, Robin [hidden email] wrote > Hi all: > > 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因? > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > <init> > (OperatorChain.java:144) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.InvalidClassException: > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class > invalid for deserialization > at > java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150) > at > java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > > 大致逻辑如下, 我有2条流: > 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka > 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka > 代码如下: > StreamExecutionEnvironment streamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment(); > List > <KafkaInfo> > kafkaSourceConfiguration = this.kafkaConfiguration.getSource0(); > KafkaInfo kafkaSinkConfiguration = > this.kafkaConfiguration.getSink(); > RecordTransformOperator transformOperator = new > RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE); > RecordKeySelector keySelector = new RecordKeySelector(); > RecordComputeOperator computeOperator = new > RecordComputeOperator(); > Properties sinkProperties = new Properties(); > sinkProperties.setProperty("bootstrap.servers", > kafkaSinkConfiguration.getBootstrapServer()); > FlinkKafkaProducer011 flinkKafkaProducer > = new > FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new > KafkaSerializer(), sinkProperties); > > List<SingleOutputStreamOperator<Tuple2<String, > String>>> dataStreamList = new ArrayList<>(); > for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) { > Properties sourceProperties = new Properties(); > sourceProperties.setProperty("bootstrap.servers", > kafkaInfo.getBootstrapServer()); > sourceProperties.setProperty("group.id", > kafkaInfo.getGroupId()); > sourceProperties.setProperty("max.poll.records", > kafkaInfo.getMaxPollRecord()); > sourceProperties.put("max.poll.interval.ms", > kafkaInfo.getMaxPollIntervalMs()); > String topicName = kafkaInfo.getTopicName(); > FlinkKafkaConsumer011<Tuple2<String, String>> > flinkKafkaConsumer > = new FlinkKafkaConsumer011(topicName, > new KafkaDeserializer(), > sourceProperties); > SingleOutputStreamOperator<Tuple2<String, String>> > singleOutputStreamOperator = > > streamExecutionEnvironment.addSource(flinkKafkaConsumer); > dataStreamList.add(singleOutputStreamOperator); > } > > DataStream<Tuple2<String, String>> unionDataStream = > dataStreamList.get(0); > for(int i = 1; i<dataStreamList.size(); i++) { > unionDataStream = > unionDataStream.union(dataStreamList.get(i)); > } > unionDataStream.flatMap(transformOperator) > .keyBy(keySelector) > .flatMap(computeOperator) > .addSink(flinkKafkaProducer); > > RecordTransformOperator transformOperator1 = new > RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM); > Properties sinkProperties1 = new Properties(); > sinkProperties1.setProperty("bootstrap.servers", > kafkaSinkConfiguration.getBootstrapServer()); > FlinkKafkaProducer011 flinkKafkaProducer1 > = new > FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new > KafkaSerializer(), sinkProperties1); > KafkaInfo kafkaInfo = > this.kafkaConfiguration.getSource1().get(0); > Properties sourceProperties = new Properties(); > sourceProperties.setProperty("bootstrap.servers", > kafkaInfo.getBootstrapServer()); > sourceProperties.setProperty("group.id", > kafkaInfo.getGroupId()); > sourceProperties.setProperty("max.poll.records", > kafkaInfo.getMaxPollRecord()); > sourceProperties.put("max.poll.interval.ms", > kafkaInfo.getMaxPollIntervalMs()); > String topicName = kafkaInfo.getTopicName(); > FlinkKafkaConsumer011<Tuple2<String, String>> > flinkKafkaConsumer > = new FlinkKafkaConsumer011(topicName, > new KafkaDeserializer(), > sourceProperties); > streamExecutionEnvironment > .addSource(flinkKafkaConsumer) > .flatMap(transformOperator1) > .addSink(flinkKafkaProducer1); > streamExecutionEnvironment.execute(); > > > freeza1982@ -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
InstantiationUtil这个类我代码中并未显示调用,具体应该如何定位问题原因,烦请看下能否提供下可行的解决方案
[hidden email] 发件人: Robin Zhang 发送时间: 2020-10-20 14:39 收件人: user-zh 主题: Re: 单任务多条流的逻辑报错 Hi, 根据报错内容,定位到你的代码在 at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。 Best, Robin [hidden email] wrote > Hi all: > > 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因? > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > <init> > (OperatorChain.java:144) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.InvalidClassException: > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class > invalid for deserialization > at > java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150) > at > java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > > 大致逻辑如下, 我有2条流: > 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka > 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka > 代码如下: > StreamExecutionEnvironment streamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment(); > List > <KafkaInfo> > kafkaSourceConfiguration = this.kafkaConfiguration.getSource0(); > KafkaInfo kafkaSinkConfiguration = > this.kafkaConfiguration.getSink(); > RecordTransformOperator transformOperator = new > RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE); > RecordKeySelector keySelector = new RecordKeySelector(); > RecordComputeOperator computeOperator = new > RecordComputeOperator(); > Properties sinkProperties = new Properties(); > sinkProperties.setProperty("bootstrap.servers", > kafkaSinkConfiguration.getBootstrapServer()); > FlinkKafkaProducer011 flinkKafkaProducer > = new > FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new > KafkaSerializer(), sinkProperties); > > List<SingleOutputStreamOperator<Tuple2<String, > String>>> dataStreamList = new ArrayList<>(); > for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) { > Properties sourceProperties = new Properties(); > sourceProperties.setProperty("bootstrap.servers", > kafkaInfo.getBootstrapServer()); > sourceProperties.setProperty("group.id", > kafkaInfo.getGroupId()); > sourceProperties.setProperty("max.poll.records", > kafkaInfo.getMaxPollRecord()); > sourceProperties.put("max.poll.interval.ms", > kafkaInfo.getMaxPollIntervalMs()); > String topicName = kafkaInfo.getTopicName(); > FlinkKafkaConsumer011<Tuple2<String, String>> > flinkKafkaConsumer > = new FlinkKafkaConsumer011(topicName, > new KafkaDeserializer(), > sourceProperties); > SingleOutputStreamOperator<Tuple2<String, String>> > singleOutputStreamOperator = > > streamExecutionEnvironment.addSource(flinkKafkaConsumer); > dataStreamList.add(singleOutputStreamOperator); > } > > DataStream<Tuple2<String, String>> unionDataStream = > dataStreamList.get(0); > for(int i = 1; i<dataStreamList.size(); i++) { > unionDataStream = > unionDataStream.union(dataStreamList.get(i)); > } > unionDataStream.flatMap(transformOperator) > .keyBy(keySelector) > .flatMap(computeOperator) > .addSink(flinkKafkaProducer); > > RecordTransformOperator transformOperator1 = new > RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM); > Properties sinkProperties1 = new Properties(); > sinkProperties1.setProperty("bootstrap.servers", > kafkaSinkConfiguration.getBootstrapServer()); > FlinkKafkaProducer011 flinkKafkaProducer1 > = new > FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new > KafkaSerializer(), sinkProperties1); > KafkaInfo kafkaInfo = > this.kafkaConfiguration.getSource1().get(0); > Properties sourceProperties = new Properties(); > sourceProperties.setProperty("bootstrap.servers", > kafkaInfo.getBootstrapServer()); > sourceProperties.setProperty("group.id", > kafkaInfo.getGroupId()); > sourceProperties.setProperty("max.poll.records", > kafkaInfo.getMaxPollRecord()); > sourceProperties.put("max.poll.interval.ms", > kafkaInfo.getMaxPollIntervalMs()); > String topicName = kafkaInfo.getTopicName(); > FlinkKafkaConsumer011<Tuple2<String, String>> > flinkKafkaConsumer > = new FlinkKafkaConsumer011(topicName, > new KafkaDeserializer(), > sourceProperties); > streamExecutionEnvironment > .addSource(flinkKafkaConsumer) > .flatMap(transformOperator1) > .addSink(flinkKafkaProducer1); > streamExecutionEnvironment.execute(); > > > freeza1982@ -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |