使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

费文杰

以下是我的代码:
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.index.IndexRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.Requests;
import org.apache.http.HttpHost;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.*;
@Slf4j
public class TrackToEsJob {
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并发数,一般跟机器核数保持一致
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        final Properties kafkaProps = new Properties();
        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.100:9092");
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "track-flink-group");
        kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer("bi-track-log-client",
                new SimpleStringSchema(), kafkaProps);
        // 默认从最近开始消费
        flinkKafkaConsumer.setStartFromLatest();
        // 1、kafka来源stream,使用跟分区数量一致的并行度
        int partitionCount = 1;
        DataStream<String> sourceStream = env.addSource(flinkKafkaConsumer)
                .setParallelism(1)
                .name("source_kafka_trackTopics")
                .uid("source_kafka_trackTopics");
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("192.168.1.101", 9200, "http"));
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder(httpHosts, new ElasticsearchSinkFunction<String>() {      
          public IndexRequest createIndexRequest(String o) {
                JSONObject jsonObject = JSONObject.parseObject(o);
                System.out.println("saving data" +jsonObject.toJSONString());
                Map<String,String> esData = new HashMap<>();
                esData.put("appId",jsonObject.getString("appId"));
                esData.put("indexKey",jsonObject.getString("indexKey"));
                esData.put("indexValue",jsonObject.getString("indexValue"));
                return Requests.indexRequest()
                        .index("bi_track_log_es")
                        .type("doc")
                        .source(esData);
            }
            @Override
            public void process(String o, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                requestIndexer.add(createIndexRequest(o));
            }
        });
        esSinkBuilder.setBulkFlushMaxActions(1);
        sourceStream.addSink(esSinkBuilder.build());
        env.execute("TrackToEsJob");
    }我认为flink-sql-connector-elasticsearch6 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢!

以下为报错信息:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254)
... 8 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
... 32 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80)
... 42 more
Reply | Threaded
Open this post in threaded view
|

Re: 使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

Shengkai Fang
你的意思是不是用1.10的es包没问题,但是用1.11的有问题?
似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。
参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2

费文杰 <[hidden email]> 于2020年8月7日周五 下午3:56写道:

>
> 以下是我的代码:
> import com.alibaba.fastjson.JSONObject;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.api.common.functions.RuntimeContext;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
> import
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
> import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
> import org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.index.IndexRequest;
> import org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.Requests;
> import org.apache.http.HttpHost;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import java.util.*;
> @Slf4j
> public class TrackToEsJob {
>     public static void main(String[] args) throws Exception {
>         // 获取执行环境
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // 设置并发数,一般跟机器核数保持一致
>         env.setParallelism(1);
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         final Properties kafkaProps = new Properties();
>         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.1.100:9092");
>         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "track-flink-group");
>         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "false");
>         FlinkKafkaConsumer<String> flinkKafkaConsumer = new
> FlinkKafkaConsumer("bi-track-log-client",
>                 new SimpleStringSchema(), kafkaProps);
>         // 默认从最近开始消费
>         flinkKafkaConsumer.setStartFromLatest();
>         // 1、kafka来源stream,使用跟分区数量一致的并行度
>         int partitionCount = 1;
>         DataStream<String> sourceStream = env.addSource(flinkKafkaConsumer)
>                 .setParallelism(1)
>                 .name("source_kafka_trackTopics")
>                 .uid("source_kafka_trackTopics");
>         List<HttpHost> httpHosts = new ArrayList<>();
>         httpHosts.add(new HttpHost("192.168.1.101", 9200, "http"));
>         ElasticsearchSink.Builder<String> esSinkBuilder = new
> ElasticsearchSink.Builder(httpHosts, new
> ElasticsearchSinkFunction<String>() {
>           public IndexRequest createIndexRequest(String o) {
>                 JSONObject jsonObject = JSONObject.parseObject(o);
>                 System.out.println("saving data"
> +jsonObject.toJSONString());
>                 Map<String,String> esData = new HashMap<>();
>                 esData.put("appId",jsonObject.getString("appId"));
>                 esData.put("indexKey",jsonObject.getString("indexKey"));
>
> esData.put("indexValue",jsonObject.getString("indexValue"));
>                 return Requests.indexRequest()
>                         .index("bi_track_log_es")
>                         .type("doc")
>                         .source(esData);
>             }
>             @Override
>             public void process(String o, RuntimeContext runtimeContext,
> RequestIndexer requestIndexer) {
>                 requestIndexer.add(createIndexRequest(o));
>             }
>         });
>         esSinkBuilder.setBulkFlushMaxActions(1);
>         sourceStream.addSink(esSinkBuilder.build());
>         env.execute("TrackToEsJob");
>     }我认为flink-sql-connector-elasticsearch6
> 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢!
>
> 以下为报错信息:
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at
> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254)
> ... 8 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
> ... 32 more
> Caused by: java.lang.IllegalArgumentException: Invalid lambda
> deserialization
> at
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80)
> ... 42 more
>
Reply | Threaded
Open this post in threaded view
|

Re: 使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

Shengkai Fang
不好意思,在es6上也进行了相应的修复。
但似乎是一个相同的问题。

Shengkai Fang <[hidden email]> 于2020年8月7日周五 下午7:52写道:

> 你的意思是不是用1.10的es包没问题,但是用1.11的有问题?
> 似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。
> 参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2
>
> 费文杰 <[hidden email]> 于2020年8月7日周五 下午3:56写道:
>
>>
>> 以下是我的代码:
>> import com.alibaba.fastjson.JSONObject;
>> import lombok.extern.slf4j.Slf4j;
>> import org.apache.flink.api.common.functions.RuntimeContext;
>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
>> import
>> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
>> import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
>> import org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.action.index.IndexRequest;
>> import org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.Requests;
>> import org.apache.http.HttpHost;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>> import org.apache.kafka.clients.consumer.ConsumerConfig;
>> import java.util.*;
>> @Slf4j
>> public class TrackToEsJob {
>>     public static void main(String[] args) throws Exception {
>>         // 获取执行环境
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         // 设置并发数,一般跟机器核数保持一致
>>         env.setParallelism(1);
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>         final Properties kafkaProps = new Properties();
>>         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
>> 192.168.1.100:9092");
>>         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
>> "track-flink-group");
>>         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>> "false");
>>         FlinkKafkaConsumer<String> flinkKafkaConsumer = new
>> FlinkKafkaConsumer("bi-track-log-client",
>>                 new SimpleStringSchema(), kafkaProps);
>>         // 默认从最近开始消费
>>         flinkKafkaConsumer.setStartFromLatest();
>>         // 1、kafka来源stream,使用跟分区数量一致的并行度
>>         int partitionCount = 1;
>>         DataStream<String> sourceStream =
>> env.addSource(flinkKafkaConsumer)
>>                 .setParallelism(1)
>>                 .name("source_kafka_trackTopics")
>>                 .uid("source_kafka_trackTopics");
>>         List<HttpHost> httpHosts = new ArrayList<>();
>>         httpHosts.add(new HttpHost("192.168.1.101", 9200, "http"));
>>         ElasticsearchSink.Builder<String> esSinkBuilder = new
>> ElasticsearchSink.Builder(httpHosts, new
>> ElasticsearchSinkFunction<String>() {
>>           public IndexRequest createIndexRequest(String o) {
>>                 JSONObject jsonObject = JSONObject.parseObject(o);
>>                 System.out.println("saving data"
>> +jsonObject.toJSONString());
>>                 Map<String,String> esData = new HashMap<>();
>>                 esData.put("appId",jsonObject.getString("appId"));
>>                 esData.put("indexKey",jsonObject.getString("indexKey"));
>>
>> esData.put("indexValue",jsonObject.getString("indexValue"));
>>                 return Requests.indexRequest()
>>                         .index("bi_track_log_es")
>>                         .type("doc")
>>                         .source(esData);
>>             }
>>             @Override
>>             public void process(String o, RuntimeContext runtimeContext,
>> RequestIndexer requestIndexer) {
>>                 requestIndexer.add(createIndexRequest(o));
>>             }
>>         });
>>         esSinkBuilder.setBulkFlushMaxActions(1);
>>         sourceStream.addSink(esSinkBuilder.build());
>>         env.execute("TrackToEsJob");
>>     }我认为flink-sql-connector-elasticsearch6
>> 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢!
>>
>> 以下为报错信息:
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>> Cannot instantiate user function.
>> at
>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: unexpected exception type
>> at
>> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
>> at
>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>> at
>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254)
>> ... 8 more
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
>> ... 32 more
>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>> deserialization
>> at
>> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80)
>> ... 42 more
>>
>