以下是我的代码: import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.index.IndexRequest; import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.Requests; import org.apache.http.HttpHost; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.*; @Slf4j public class TrackToEsJob { public static void main(String[] args) throws Exception { // 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发数,一般跟机器核数保持一致 env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); final Properties kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.100:9092"); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "track-flink-group"); kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer("bi-track-log-client", new SimpleStringSchema(), kafkaProps); // 默认从最近开始消费 flinkKafkaConsumer.setStartFromLatest(); // 1、kafka来源stream,使用跟分区数量一致的并行度 int partitionCount = 1; DataStream<String> sourceStream = env.addSource(flinkKafkaConsumer) .setParallelism(1) .name("source_kafka_trackTopics") .uid("source_kafka_trackTopics"); List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("192.168.1.101", 9200, "http")); ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder(httpHosts, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String o) { JSONObject jsonObject = JSONObject.parseObject(o); System.out.println("saving data" +jsonObject.toJSONString()); Map<String,String> esData = new HashMap<>(); esData.put("appId",jsonObject.getString("appId")); esData.put("indexKey",jsonObject.getString("indexKey")); esData.put("indexValue",jsonObject.getString("indexValue")); return Requests.indexRequest() .index("bi_track_log_es") .type("doc") .source(esData); } @Override public void process(String o, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(o)); } }); esSinkBuilder.setBulkFlushMaxActions(1); sourceStream.addSink(esSinkBuilder.build()); env.execute("TrackToEsJob"); }我认为flink-sql-connector-elasticsearch6 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢! 以下为报错信息: Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254) ... 8 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) ... 32 more Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) ... 42 more |
你的意思是不是用1.10的es包没问题,但是用1.11的有问题?
似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。 参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2 费文杰 <[hidden email]> 于2020年8月7日周五 下午3:56写道: > > 以下是我的代码: > import com.alibaba.fastjson.JSONObject; > import lombok.extern.slf4j.Slf4j; > import org.apache.flink.api.common.functions.RuntimeContext; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; > import > org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; > import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; > import org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.action.index.IndexRequest; > import org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.Requests; > import org.apache.http.HttpHost; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import java.util.*; > @Slf4j > public class TrackToEsJob { > public static void main(String[] args) throws Exception { > // 获取执行环境 > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 设置并发数,一般跟机器核数保持一致 > env.setParallelism(1); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > final Properties kafkaProps = new Properties(); > kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " > 192.168.1.100:9092"); > kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > "track-flink-group"); > kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > "false"); > FlinkKafkaConsumer<String> flinkKafkaConsumer = new > FlinkKafkaConsumer("bi-track-log-client", > new SimpleStringSchema(), kafkaProps); > // 默认从最近开始消费 > flinkKafkaConsumer.setStartFromLatest(); > // 1、kafka来源stream,使用跟分区数量一致的并行度 > int partitionCount = 1; > DataStream<String> sourceStream = env.addSource(flinkKafkaConsumer) > .setParallelism(1) > .name("source_kafka_trackTopics") > .uid("source_kafka_trackTopics"); > List<HttpHost> httpHosts = new ArrayList<>(); > httpHosts.add(new HttpHost("192.168.1.101", 9200, "http")); > ElasticsearchSink.Builder<String> esSinkBuilder = new > ElasticsearchSink.Builder(httpHosts, new > ElasticsearchSinkFunction<String>() { > public IndexRequest createIndexRequest(String o) { > JSONObject jsonObject = JSONObject.parseObject(o); > System.out.println("saving data" > +jsonObject.toJSONString()); > Map<String,String> esData = new HashMap<>(); > esData.put("appId",jsonObject.getString("appId")); > esData.put("indexKey",jsonObject.getString("indexKey")); > > esData.put("indexValue",jsonObject.getString("indexValue")); > return Requests.indexRequest() > .index("bi_track_log_es") > .type("doc") > .source(esData); > } > @Override > public void process(String o, RuntimeContext runtimeContext, > RequestIndexer requestIndexer) { > requestIndexer.add(createIndexRequest(o)); > } > }); > esSinkBuilder.setBulkFlushMaxActions(1); > sourceStream.addSink(esSinkBuilder.build()); > env.execute("TrackToEsJob"); > }我认为flink-sql-connector-elasticsearch6 > 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢! > > 以下为报错信息: > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at > java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254) > ... 8 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) > ... 32 more > Caused by: java.lang.IllegalArgumentException: Invalid lambda > deserialization > at > org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) > ... 42 more > |
不好意思,在es6上也进行了相应的修复。
但似乎是一个相同的问题。 Shengkai Fang <[hidden email]> 于2020年8月7日周五 下午7:52写道: > 你的意思是不是用1.10的es包没问题,但是用1.11的有问题? > 似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。 > 参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2 > > 费文杰 <[hidden email]> 于2020年8月7日周五 下午3:56写道: > >> >> 以下是我的代码: >> import com.alibaba.fastjson.JSONObject; >> import lombok.extern.slf4j.Slf4j; >> import org.apache.flink.api.common.functions.RuntimeContext; >> import org.apache.flink.api.common.serialization.SimpleStringSchema; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; >> import >> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; >> import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; >> import org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.action.index.IndexRequest; >> import org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.client.Requests; >> import org.apache.http.HttpHost; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >> import org.apache.kafka.clients.consumer.ConsumerConfig; >> import java.util.*; >> @Slf4j >> public class TrackToEsJob { >> public static void main(String[] args) throws Exception { >> // 获取执行环境 >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> // 设置并发数,一般跟机器核数保持一致 >> env.setParallelism(1); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> final Properties kafkaProps = new Properties(); >> kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " >> 192.168.1.100:9092"); >> kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, >> "track-flink-group"); >> kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >> "false"); >> FlinkKafkaConsumer<String> flinkKafkaConsumer = new >> FlinkKafkaConsumer("bi-track-log-client", >> new SimpleStringSchema(), kafkaProps); >> // 默认从最近开始消费 >> flinkKafkaConsumer.setStartFromLatest(); >> // 1、kafka来源stream,使用跟分区数量一致的并行度 >> int partitionCount = 1; >> DataStream<String> sourceStream = >> env.addSource(flinkKafkaConsumer) >> .setParallelism(1) >> .name("source_kafka_trackTopics") >> .uid("source_kafka_trackTopics"); >> List<HttpHost> httpHosts = new ArrayList<>(); >> httpHosts.add(new HttpHost("192.168.1.101", 9200, "http")); >> ElasticsearchSink.Builder<String> esSinkBuilder = new >> ElasticsearchSink.Builder(httpHosts, new >> ElasticsearchSinkFunction<String>() { >> public IndexRequest createIndexRequest(String o) { >> JSONObject jsonObject = JSONObject.parseObject(o); >> System.out.println("saving data" >> +jsonObject.toJSONString()); >> Map<String,String> esData = new HashMap<>(); >> esData.put("appId",jsonObject.getString("appId")); >> esData.put("indexKey",jsonObject.getString("indexKey")); >> >> esData.put("indexValue",jsonObject.getString("indexValue")); >> return Requests.indexRequest() >> .index("bi_track_log_es") >> .type("doc") >> .source(esData); >> } >> @Override >> public void process(String o, RuntimeContext runtimeContext, >> RequestIndexer requestIndexer) { >> requestIndexer.add(createIndexRequest(o)); >> } >> }); >> esSinkBuilder.setBulkFlushMaxActions(1); >> sourceStream.addSink(esSinkBuilder.build()); >> env.execute("TrackToEsJob"); >> }我认为flink-sql-connector-elasticsearch6 >> 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢! >> >> 以下为报错信息: >> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: >> Cannot instantiate user function. >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.IOException: unexpected exception type >> at >> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) >> at >> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) >> at >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254) >> ... 8 more >> Caused by: java.lang.reflect.InvocationTargetException >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) >> ... 32 more >> Caused by: java.lang.IllegalArgumentException: Invalid lambda >> deserialization >> at >> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) >> ... 42 more >> > |
Free forum by Nabble | Edit this page |