使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。
我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。 请问有人遇到过这种问题吗? 报错信息如下: ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820) at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 6 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Error in configuring object at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) ... 10 more Caused by: java.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161) at org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256) ... 22 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) ... 26 more Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found. at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135) at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175) at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) ... 31 more Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060) at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128) ... 33 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 4 more 代码如下: ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment(); execEnv.setParallelism(1); BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null, "/home/sean/120_conf", "1.2.1"); hiveCatalog.open(); tableEnv.registerCatalog("myhive", hiveCatalog); Optional<Catalog> myHive = tableEnv.getCatalog("myhive"); ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01"); // 这里可以打印 System.out.println(myHive.get().getTable(myTablePath).getSchema()); tableEnv.useCatalog("myhive"); Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01"); List<Row> result = tableEnv.toDataSet(table, Row.class).collect(); System.out.println(result); tableEnv.execute(""); Hive相关的pom配置如下: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> <exclusions> <exclusion> <groupId>org.apache.thrift</groupId> <artifactId>libfb303</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.6.5-7.0</version> <scope>provided</scope> </dependency> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 |
hi 苏欣:
建议先检查一下最后打的 jar 包里面是否包含了 com.hadoop.compression.lzo.LzoCodec 和 com.hadoop.compression.lzo.LzoCodec 苏 欣 <[hidden email]> 于2019年8月9日周五 下午5:41写道: > 使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。 > > 我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。 > 请问有人遇到过这种问题吗? > > 报错信息如下: > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: Could not > retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) > at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > [Internal server error., <Exception on server side: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 6 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 7 more > Caused by: org.apache.flink.runtime.JobException: Creating the input > splits caused an error: Error in configuring object > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > at > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > ... 10 more > Caused by: java.lang.RuntimeException: Error in configuring object > at > org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) > at > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) > at > org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161) > at > org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256) > ... 22 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) > ... 26 more > Caused by: java.lang.IllegalArgumentException: Compression codec > com.hadoop.compression.lzo.LzoCodec not found. > at > org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135) > at > org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175) > at > org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) > ... 31 more > Caused by: java.lang.ClassNotFoundException: Class > com.hadoop.compression.lzo.LzoCodec not found > at > org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060) > at > org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128) > ... 33 more > > End of exception on server side>] > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more > > > 代码如下: > > ExecutionEnvironment execEnv = > ExecutionEnvironment.getExecutionEnvironment(); > execEnv.setParallelism(1); > BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); > > HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null, > "/home/sean/120_conf", "1.2.1"); > hiveCatalog.open(); > > tableEnv.registerCatalog("myhive", hiveCatalog); > > Optional<Catalog> myHive = tableEnv.getCatalog("myhive"); > > ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01"); > // 这里可以打印 > System.out.println(myHive.get().getTable(myTablePath).getSchema()); > > tableEnv.useCatalog("myhive"); > Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01"); > List<Row> result = tableEnv.toDataSet(table, Row.class).collect(); > System.out.println(result); > tableEnv.execute(""); > > > > > > Hive相关的pom配置如下: > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-metastore</artifactId> > <version>1.2.1</version> > </dependency> > > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-exec</artifactId> > <version>1.2.1</version> > <exclusions> > <exclusion> > <groupId>org.apache.thrift</groupId> > <artifactId>libfb303</artifactId> > </exclusion> > </exclusions> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-shaded-hadoop-2-uber</artifactId> > <version>2.6.5-7.0</version> > <scope>provided</scope> > </dependency> > > > > > 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > > |
In reply to this post by 苏 欣
先问下你的hdfs上的文件是不是用lzo压缩的?
[hidden email] 发件人: 苏 欣 发送时间: 2019-08-09 17:40 收件人: [hidden email] 主题: flink1.10版本连接hive报错 使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。 我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。 请问有人遇到过这种问题吗? 报错信息如下: ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820) at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 6 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Error in configuring object at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) ... 10 more Caused by: java.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161) at org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256) ... 22 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) ... 26 more Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found. at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135) at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175) at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) ... 31 more Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060) at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128) ... 33 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 4 more 代码如下: ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment(); execEnv.setParallelism(1); BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null, "/home/sean/120_conf", "1.2.1"); hiveCatalog.open(); tableEnv.registerCatalog("myhive", hiveCatalog); Optional<Catalog> myHive = tableEnv.getCatalog("myhive"); ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01"); // 这里可以打印 System.out.println(myHive.get().getTable(myTablePath).getSchema()); tableEnv.useCatalog("myhive"); Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01"); List<Row> result = tableEnv.toDataSet(table, Row.class).collect(); System.out.println(result); tableEnv.execute(""); Hive相关的pom配置如下: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> <exclusions> <exclusion> <groupId>org.apache.thrift</groupId> <artifactId>libfb303</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.6.5-7.0</version> <scope>provided</scope> </dependency> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 |
In reply to this post by zhisheng
感谢各位大佬提供思路,我增加了lzo的jar后不再报这种错而且能取到hive表的数据了。
我以为在flink-shaded-hadoop-2-uber里面包含了所有hadoop相关的包所以没去考虑缺包的问题😂 附下缺少的pom内容: <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-lzo</artifactId> <version>0.4.13</version> </dependency> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 ________________________________ 发件人: zhisheng <[hidden email]> 发送时间: Saturday, August 10, 2019 5:58:02 PM 收件人: user-zh <[hidden email]> 主题: Re: flink1.10版本连接hive报错 hi 苏欣: 建议先检查一下最后打的 jar 包里面是否包含了 com.hadoop.compression.lzo.LzoCodec 和 com.hadoop.compression.lzo.LzoCodec 苏 欣 <[hidden email]> 于2019年8月9日周五 下午5:41写道: > 使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。 > > 我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。 > 请问有人遇到过这种问题吗? > > 报错信息如下: > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: Could not > retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) > at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > [Internal server error., <Exception on server side: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 6 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 7 more > Caused by: org.apache.flink.runtime.JobException: Creating the input > splits caused an error: Error in configuring object > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > at > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > ... 10 more > Caused by: java.lang.RuntimeException: Error in configuring object > at > org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) > at > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) > at > org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161) > at > org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256) > ... 22 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) > ... 26 more > Caused by: java.lang.IllegalArgumentException: Compression codec > com.hadoop.compression.lzo.LzoCodec not found. > at > org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135) > at > org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175) > at > org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) > ... 31 more > Caused by: java.lang.ClassNotFoundException: Class > com.hadoop.compression.lzo.LzoCodec not found > at > org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060) > at > org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128) > ... 33 more > > End of exception on server side>] > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more > > > 代码如下: > > ExecutionEnvironment execEnv = > ExecutionEnvironment.getExecutionEnvironment(); > execEnv.setParallelism(1); > BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); > > HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null, > "/home/sean/120_conf", "1.2.1"); > hiveCatalog.open(); > > tableEnv.registerCatalog("myhive", hiveCatalog); > > Optional<Catalog> myHive = tableEnv.getCatalog("myhive"); > > ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01"); > // 这里可以打印 > System.out.println(myHive.get().getTable(myTablePath).getSchema()); > > tableEnv.useCatalog("myhive"); > Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01"); > List<Row> result = tableEnv.toDataSet(table, Row.class).collect(); > System.out.println(result); > tableEnv.execute(""); > > > > > > Hive相关的pom配置如下: > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-metastore</artifactId> > <version>1.2.1</version> > </dependency> > > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-exec</artifactId> > <version>1.2.1</version> > <exclusions> > <exclusion> > <groupId>org.apache.thrift</groupId> > <artifactId>libfb303</artifactId> > </exclusion> > </exclusions> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-shaded-hadoop-2-uber</artifactId> > <version>2.6.5-7.0</version> > <scope>provided</scope> > </dependency> > > > > > 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > > |
Free forum by Nabble | Edit this page |