大佬们好,使用flink读取hive报了一个很奇怪错:Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: invalid type code: 9C。一直不得其解,求赐教啊~
flink:1.11.2 hive: 2.2.0 hadoop:3.0.3 感觉像是输入输出流的包装那里有问题,但不知道是哪里有问题。on yarn的话,原来的HiveCatalog需要一个hive-site.xml路径,但是on yarn的话,这个怎么找,所以这里的HiveConf是我自己构造的,会不会跟这个有关系? 完成代码: public class flink { public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() // 使用BlinkPlanner .inBatchMode() // Batch模式,默认为StreamingMode .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "myhive"; // Catalog名称,定义一个唯一的名称表示 String defaultDatabase = "vr_shopping_test"; // 默认数据库名称 String hiveConfDir = "/Users/john/IdeaProjects/flink-hive-x/src/main/resources"; // hive-site.xml路径 String version = "2.1.1"; HiveConf hiveConf = createHiveConf(); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://xxxxxxxxxx:9083 <thrift://xxxxxxxxxx:9083>"); HiveCatalog hive = new MyHiveCatalog(name, defaultDatabase, hiveConf, version); tableEnv.registerCatalog("myhive", hive); tableEnv.useCatalog("myhive"); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.useDatabase(defaultDatabase); // 创建数据库,目前不支持创建hive表 String createDbSql = "select * from dim_vr_shopping_wx_users_v1 limit 1"; TableResult res = tableEnv.executeSql(createDbSql); res.print(); } private static HiveConf createHiveConf() { // create HiveConf from hadoop configuration Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration()); // Add mapred-site.xml. We need to read configurations like compression codec. for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new org.apache.flink.configuration.Configuration())) { File mapredSite = new File(new File(possibleHadoopConfPath), "mapred-site.xml"); if (mapredSite.exists()) { hadoopConf.addResource(new Path(mapredSite.getAbsolutePath())); break; } } return new HiveConf(hadoopConf, HiveConf.class); } } 完整堆栈: The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:747) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1069) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) at com.aibee.flink.main(flink.java:42) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 11 more Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'collect'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:736) ... 19 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:292) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 6 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Source: HiveTableSource(id, openid, unionid, nickname, language, city, province, headimgurl, country, last_login_time, privilege, created_at, updated_at, uuid, vip_code, level_id, member_id, sex) TablePath: vr_shopping_test.dim_vr_shopping_wx_users_v1, PartitionPruned: false, PartitionNums: null, ProjectedFields: [15]': Loading the input/output formats failed: at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388) ... 7 more Caused by: java.lang.Exception: Loading the input/output formats failed: at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:155) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:59) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212) ... 19 more Caused by: java.lang.RuntimeException: Deserializing the input/output formats failed: Could not read the user code wrapper: invalid type code: 9C at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:152) ... 21 more Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: invalid type code: 9C at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290) at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) ... 22 more Caused by: java.io.StreamCorruptedException: invalid type code: 9C at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2125) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at java.util.HashMap.readObject(HashMap.java:1404) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) ... 23 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 4 more |
这个错误一般是因为JM或者TM那边缺少相应的依赖,导致反序列化的时候失败了。
On Tue, Nov 10, 2020 at 8:22 AM 蒋龙 <[hidden email]> wrote: > 大佬们好,使用flink读取hive报了一个很奇怪错:Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: > Could not read the user code wrapper: invalid type code: 9C。一直不得其解,求赐教啊~ > flink:1.11.2 > hive: 2.2.0 > hadoop:3.0.3 > > 感觉像是输入输出流的包装那里有问题,但不知道是哪里有问题。on > yarn的话,原来的HiveCatalog需要一个hive-site.xml路径,但是on > yarn的话,这个怎么找,所以这里的HiveConf是我自己构造的,会不会跟这个有关系? > > 完成代码: > public class flink { > public static void main(String[] args) throws Exception { > EnvironmentSettings settings = EnvironmentSettings > .newInstance() > .useBlinkPlanner() // 使用BlinkPlanner > .inBatchMode() // Batch模式,默认为StreamingMode > .build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > String name = "myhive"; // Catalog名称,定义一个唯一的名称表示 > String defaultDatabase = "vr_shopping_test"; // 默认数据库名称 > String hiveConfDir = > "/Users/john/IdeaProjects/flink-hive-x/src/main/resources"; // > hive-site.xml路径 > String version = "2.1.1"; > > HiveConf hiveConf = createHiveConf(); > hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, > "thrift://xxxxxxxxxx:9083 <thrift://xxxxxxxxxx:9083>"); > > HiveCatalog hive = new MyHiveCatalog(name, defaultDatabase, > hiveConf, version); > tableEnv.registerCatalog("myhive", hive); > tableEnv.useCatalog("myhive"); > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > tableEnv.useDatabase(defaultDatabase); > // 创建数据库,目前不支持创建hive表 > String createDbSql = "select * from dim_vr_shopping_wx_users_v1 > limit 1"; > TableResult res = tableEnv.executeSql(createDbSql); > res.print(); > } > > private static HiveConf createHiveConf() { > // create HiveConf from hadoop configuration > Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new > org.apache.flink.configuration.Configuration()); > > // Add mapred-site.xml. We need to read configurations like > compression codec. > for (String possibleHadoopConfPath : > HadoopUtils.possibleHadoopConfPaths(new > org.apache.flink.configuration.Configuration())) { > File mapredSite = new File(new File(possibleHadoopConfPath), > "mapred-site.xml"); > if (mapredSite.exists()) { > hadoopConf.addResource(new > Path(mapredSite.getAbsolutePath())); > break; > } > } > return new HiveConf(hadoopConf, HiveConf.class); > } > } > > > > 完整堆栈: > > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Failed to execute sql > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > Caused by: org.apache.flink.table.api.TableException: Failed to execute sql > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:747) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1069) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) > at com.aibee.flink.main(flink.java:42) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 11 more > Caused by: org.apache.flink.util.FlinkException: Failed to execute job > 'collect'. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) > at > org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:736) > ... 19 more > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:292) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > [Internal server error., <Exception on server side: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not instantiate JobManager. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 6 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'Source: HiveTableSource(id, openid, unionid, nickname, > language, city, province, headimgurl, country, last_login_time, privilege, > created_at, updated_at, uuid, vip_code, level_id, member_id, sex) > TablePath: vr_shopping_test.dim_vr_shopping_wx_users_v1, PartitionPruned: > false, PartitionNums: null, ProjectedFields: [15]': Loading the > input/output formats failed: > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119) > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388) > ... 7 more > Caused by: java.lang.Exception: Loading the input/output formats failed: > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:155) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:59) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212) > ... 19 more > Caused by: java.lang.RuntimeException: Deserializing the input/output > formats failed: Could not read the user code wrapper: invalid type code: 9C > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:152) > ... 21 more > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: > Could not read the user code wrapper: invalid type code: 9C > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) > ... 22 more > Caused by: java.io.StreamCorruptedException: invalid type code: 9C > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2125) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at java.util.HashMap.readObject(HashMap.java:1404) > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > ... 23 more > > End of exception on server side>] > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more -- Best regards! Rui Li |
Free forum by Nabble | Edit this page |