HI: 使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar, 这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包: <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> 本地运行,有以下报错信息: Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 33 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254) ... 8 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) ... 32 more Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) ... 42 more 我认为flink-sql-connector-elasticsearch6 应该是包含flink-connector-elasticsearch6的关系,若只引入flink-sql-connector-elasticsearch6,写代码的方式运行任务也还是一样报错,现在就有点搞不清了,希望得得到指导。 谢谢! |
Hi
只引入-sql那个包就行了 代码也应该是可以直接用这个包的 | | JasonLee | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年08月07日 09:26,费文杰 写道: HI: 使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar, 这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包: <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> 本地运行,有以下报错信息: Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 33 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254) ... 8 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) ... 32 more Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) ... 42 more 我认为flink-sql-connector-elasticsearch6 应该是包含flink-connector-elasticsearch6的关系,若只引入flink-sql-connector-elasticsearch6,写代码的方式运行任务也还是一样报错,现在就有点搞不清了,希望得得到指导。 谢谢!
Best Wishes
JasonLee |
Hi,
flink-sql-connector-elasticsearch6_2.11已经打进了es的相关依赖,直接非provided引入应该是没有问题的,您说的报一样的错是错误栈完全相同么?在ide里面直接跑还是提交任务? Best, Yangze Guo On Fri, Aug 7, 2020 at 9:32 AM JasonLee <[hidden email]> wrote: > > Hi > > > 只引入-sql那个包就行了 代码也应该是可以直接用这个包的 > > > | | > JasonLee > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年08月07日 09:26,费文杰 写道: > > HI: > 使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar, > 这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包: > <dependency> > <groupId>org.apache.httpcomponents</groupId> > <artifactId>httpclient</artifactId> > <version>4.5.2</version> > </dependency> > > <dependency> > <groupId>org.elasticsearch.client</groupId> > <artifactId>elasticsearch-rest-high-level-client</artifactId> > <version>${elasticsearch.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-elasticsearch6_2.11</artifactId> > <version>${flink.version}</version> > <!--<scope>provided</scope>--> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > 本地运行,有以下报错信息: > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9) > at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) > at akka.dispatch.OnComplete.internal(Future.scala:264) > at akka.dispatch.OnComplete.internal(Future.scala:261) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) > at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) > at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) > at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) > ... 33 more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) > at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) > at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) > at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) > at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) > at akka.actor.Actor$class.aroundReceive(Actor.scala) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > ... 4 more > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254) > ... 8 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) > ... 32 more > Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization > at org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) > ... 42 more > 我认为flink-sql-connector-elasticsearch6 应该是包含flink-connector-elasticsearch6的关系,若只引入flink-sql-connector-elasticsearch6,写代码的方式运行任务也还是一样报错,现在就有点搞不清了,希望得得到指导。 > 谢谢! |
In reply to this post by 费文杰
Hi
你这种场景使用sql jar就可以了,sql jar 里面包含了 es connector 相关的依赖,两者确实不能共存,因为sql jar 对 es的依赖做了shade,而 es connector jar 没有对es的依赖做shade. 另外,你的异常栈应该是一个已知的lambda表达式序列化问题[1], 在1.11.0已经修复,可以升级1.11.1试下? Best Leonard [1] https://issues.apache.org/jira/browse/FLINK-18006 <https://issues.apache.org/jira/browse/FLINK-18006> > 在 2020年8月7日,09:26,费文杰 <[hidden email]> 写道: > > > HI: > 使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar, > 这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包: > <dependency> > <groupId>org.apache.httpcomponents</groupId> > <artifactId>httpclient</artifactId> > <version>4.5.2</version> > </dependency> > > <dependency> > <groupId>org.elasticsearch.client</groupId> > <artifactId>elasticsearch-rest-high-level-client</artifactId> > <version>${elasticsearch.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-elasticsearch6_2.11</artifactId> > <version>${flink.version}</version> > <!--<scope>provided</scope>--> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > 本地运行,有以下报错信息: > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9) > at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) > at akka.dispatch.OnComplete.internal(Future.scala:264) > at akka.dispatch.OnComplete.internal(Future.scala:261) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) > at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) > at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) > at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) > ... 33 more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) > at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) > at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) > at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) > at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) > at akka.actor.Actor$class.aroundReceive(Actor.scala) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > ... 4 more > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254) > ... 8 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) > ... 32 more > Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization > at org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) > ... 42 more > 我认为flink-sql-connector-elasticsearch6 应该是包含flink-connector-elasticsearch6的关系,若只引入flink-sql-connector-elasticsearch6,写代码的方式运行任务也还是一样报错,现在就有点搞不清了,希望得得到指导。 > 谢谢! |
In reply to this post by 费文杰
|
Hi
如果只是sql作业,使用flink-sql-connector-elasticsearch6_2.11_1.10.0 就可以了,如果纯datastream作业使用flink-connector-elasticsearch6_2.11_1.10.0 就可以了 如果两个包都要使用,有两个思路 1. 你自己自己打个包,把上面两个包的依赖放在一起。 2. 和1类似,shade掉flink-connector-elasticsearch6_2.11_1.10.0 我没实际打过,你可以动手试下。 祝好 > 在 2021年4月20日,14:13,william <[hidden email]> 写道: > > 你好,我也遇到了同样的问题,请问你们是怎么解决的,谢谢! > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |