flink1.10版本连接hive报错

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.10版本连接hive报错

苏 欣
使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。
我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。
请问有人遇到过这种问题吗?

报错信息如下:
------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
        at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
        at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
        ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
        at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
        ... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Error in configuring object
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
        at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
        at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
        ... 10 more
Caused by: java.lang.RuntimeException: Error in configuring object
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
        at org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161)
        at org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67)
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256)
        ... 22 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
        ... 26 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
        at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
        ... 31 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
        ... 33 more

End of exception on server side>]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
       at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
        ... 4 more


代码如下:

ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(1);
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);

HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null, "/home/sean/120_conf", "1.2.1");
hiveCatalog.open();

tableEnv.registerCatalog("myhive", hiveCatalog);

Optional<Catalog> myHive = tableEnv.getCatalog("myhive");

ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01");
// 这里可以打印
System.out.println(myHive.get().getTable(myTablePath).getSchema());

tableEnv.useCatalog("myhive");
Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01");
List<Row> result = tableEnv.toDataSet(table, Row.class).collect();
System.out.println(result);
tableEnv.execute("");





Hive相关的pom配置如下:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

<dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-metastore</artifactId>
   <version>1.2.1</version>
</dependency>

<dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-exec</artifactId>
   <version>1.2.1</version>
   <exclusions>
      <exclusion>
         <groupId>org.apache.thrift</groupId>
         <artifactId>libfb303</artifactId>
      </exclusion>
   </exclusions>
</dependency>

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-shaded-hadoop-2-uber</artifactId>
   <version>2.6.5-7.0</version>
   <scope>provided</scope>
</dependency>




发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

Reply | Threaded
Open this post in threaded view
|

Re: flink1.10版本连接hive报错

zhisheng
hi 苏欣:
建议先检查一下最后打的 jar 包里面是否包含了  com.hadoop.compression.lzo.LzoCodec 和
com.hadoop.compression.lzo.LzoCodec

苏 欣 <[hidden email]> 于2019年8月9日周五 下午5:41写道:

> 使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。
>
> 我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。
> 请问有人遇到过这种问题吗?
>
> 报错信息如下:
> ------------------------------------------------------------
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889)
>         at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>         at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>         at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>         at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>         at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>         at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>         at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>         at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
>         at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>         at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211)
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
>         at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
>         at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>         at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
>         at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>         at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>         ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
>         at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
>         at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
>         at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
>         at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>         ... 7 more
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Error in configuring object
>         at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
>         at
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>         at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
>         ... 10 more
> Caused by: java.lang.RuntimeException: Error in configuring object
>         at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
>         at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
>         at
> org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161)
>         at
> org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256)
>         ... 22 more
> Caused by: java.lang.reflect.InvocationTargetException
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
>         ... 26 more
> Caused by: java.lang.IllegalArgumentException: Compression codec
> com.hadoop.compression.lzo.LzoCodec not found.
>         at
> org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
>         at
> org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
>         at
> org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
>         ... 31 more
> Caused by: java.lang.ClassNotFoundException: Class
> com.hadoop.compression.lzo.LzoCodec not found
>         at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
>         at
> org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
>         ... 33 more
>
> End of exception on server side>]
>         at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>        at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 4 more
>
>
> 代码如下:
>
> ExecutionEnvironment execEnv =
> ExecutionEnvironment.getExecutionEnvironment();
> execEnv.setParallelism(1);
> BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
>
> HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null,
> "/home/sean/120_conf", "1.2.1");
> hiveCatalog.open();
>
> tableEnv.registerCatalog("myhive", hiveCatalog);
>
> Optional<Catalog> myHive = tableEnv.getCatalog("myhive");
>
> ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01");
> // 这里可以打印
> System.out.println(myHive.get().getTable(myTablePath).getSchema());
>
> tableEnv.useCatalog("myhive");
> Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01");
> List<Row> result = tableEnv.toDataSet(table, Row.class).collect();
> System.out.println(result);
> tableEnv.execute("");
>
>
>
>
>
> Hive相关的pom配置如下:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>    <version>${flink.version}</version>
> </dependency>
>
> <dependency>
>    <groupId>org.apache.hive</groupId>
>    <artifactId>hive-metastore</artifactId>
>    <version>1.2.1</version>
> </dependency>
>
> <dependency>
>    <groupId>org.apache.hive</groupId>
>    <artifactId>hive-exec</artifactId>
>    <version>1.2.1</version>
>    <exclusions>
>       <exclusion>
>          <groupId>org.apache.thrift</groupId>
>          <artifactId>libfb303</artifactId>
>       </exclusion>
>    </exclusions>
> </dependency>
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>
>  <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
>    <version>${flink.version}</version>
>    <scope>provided</scope>
> </dependency>
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
>    <version>2.6.5-7.0</version>
>    <scope>provided</scope>
> </dependency>
>
>
>
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>
Reply | Threaded
Open this post in threaded view
|

回复: flink1.10版本连接hive报错

athlon512@gmail.com
In reply to this post by 苏 欣
先问下你的hdfs上的文件是不是用lzo压缩的?



[hidden email]
 
发件人: 苏 欣
发送时间: 2019-08-09 17:40
收件人: [hidden email]
主题: flink1.10版本连接hive报错
使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。
我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。
请问有人遇到过这种问题吗?
 
报错信息如下:
------------------------------------------------------------
The program finished with the following exception:
 
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
        at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
        at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
        ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
        at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
        ... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Error in configuring object
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
        at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
        at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
        ... 10 more
Caused by: java.lang.RuntimeException: Error in configuring object
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
        at org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161)
        at org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67)
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256)
        ... 22 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
        ... 26 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
        at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
        ... 31 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
        ... 33 more
 
End of exception on server side>]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
       at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
        ... 4 more
 
 
代码如下:
 
ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(1);
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
 
HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null, "/home/sean/120_conf", "1.2.1");
hiveCatalog.open();
 
tableEnv.registerCatalog("myhive", hiveCatalog);
 
Optional<Catalog> myHive = tableEnv.getCatalog("myhive");
 
ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01");
// 这里可以打印
System.out.println(myHive.get().getTable(myTablePath).getSchema());
 
tableEnv.useCatalog("myhive");
Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01");
List<Row> result = tableEnv.toDataSet(table, Row.class).collect();
System.out.println(result);
tableEnv.execute("");
 
 
 
 
 
Hive相关的pom配置如下:
 
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>
 
<dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-metastore</artifactId>
   <version>1.2.1</version>
</dependency>
 
<dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-exec</artifactId>
   <version>1.2.1</version>
   <exclusions>
      <exclusion>
         <groupId>org.apache.thrift</groupId>
         <artifactId>libfb303</artifactId>
      </exclusion>
   </exclusions>
</dependency>
 
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
 
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-shaded-hadoop-2-uber</artifactId>
   <version>2.6.5-7.0</version>
   <scope>provided</scope>
</dependency>
 
 
 
 
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
 
Reply | Threaded
Open this post in threaded view
|

答复: flink1.10版本连接hive报错

苏 欣
In reply to this post by zhisheng
感谢各位大佬提供思路,我增加了lzo的jar后不再报这种错而且能取到hive表的数据了。

我以为在flink-shaded-hadoop-2-uber里面包含了所有hadoop相关的包所以没去考虑缺包的问题😂

附下缺少的pom内容:

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-lzo</artifactId>
   <version>0.4.13</version>
</dependency>





发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用



________________________________
发件人: zhisheng <[hidden email]>
发送时间: Saturday, August 10, 2019 5:58:02 PM
收件人: user-zh <[hidden email]>
主题: Re: flink1.10版本连接hive报错

hi 苏欣:
建议先检查一下最后打的 jar 包里面是否包含了  com.hadoop.compression.lzo.LzoCodec 和
com.hadoop.compression.lzo.LzoCodec

苏 欣 <[hidden email]> 于2019年8月9日周五 下午5:41写道:

> 使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。
>
> 我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。
> 请问有人遇到过这种问题吗?
>
> 报错信息如下:
> ------------------------------------------------------------
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889)
>         at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>         at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>         at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>         at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>         at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>         at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>         at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>         at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
>         at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>         at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211)
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
>         at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
>         at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>         at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
>         at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>         at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>         ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
>         at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
>         at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
>         at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
>         at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>         ... 7 more
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Error in configuring object
>         at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
>         at
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>         at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
>         ... 10 more
> Caused by: java.lang.RuntimeException: Error in configuring object
>         at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
>         at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
>         at
> org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161)
>         at
> org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256)
>         ... 22 more
> Caused by: java.lang.reflect.InvocationTargetException
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
>         ... 26 more
> Caused by: java.lang.IllegalArgumentException: Compression codec
> com.hadoop.compression.lzo.LzoCodec not found.
>         at
> org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
>         at
> org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
>         at
> org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
>         ... 31 more
> Caused by: java.lang.ClassNotFoundException: Class
> com.hadoop.compression.lzo.LzoCodec not found
>         at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
>         at
> org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
>         ... 33 more
>
> End of exception on server side>]
>         at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>        at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 4 more
>
>
> 代码如下:
>
> ExecutionEnvironment execEnv =
> ExecutionEnvironment.getExecutionEnvironment();
> execEnv.setParallelism(1);
> BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
>
> HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null,
> "/home/sean/120_conf", "1.2.1");
> hiveCatalog.open();
>
> tableEnv.registerCatalog("myhive", hiveCatalog);
>
> Optional<Catalog> myHive = tableEnv.getCatalog("myhive");
>
> ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01");
> // 这里可以打印
> System.out.println(myHive.get().getTable(myTablePath).getSchema());
>
> tableEnv.useCatalog("myhive");
> Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01");
> List<Row> result = tableEnv.toDataSet(table, Row.class).collect();
> System.out.println(result);
> tableEnv.execute("");
>
>
>
>
>
> Hive相关的pom配置如下:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>    <version>${flink.version}</version>
> </dependency>
>
> <dependency>
>    <groupId>org.apache.hive</groupId>
>    <artifactId>hive-metastore</artifactId>
>    <version>1.2.1</version>
> </dependency>
>
> <dependency>
>    <groupId>org.apache.hive</groupId>
>    <artifactId>hive-exec</artifactId>
>    <version>1.2.1</version>
>    <exclusions>
>       <exclusion>
>          <groupId>org.apache.thrift</groupId>
>          <artifactId>libfb303</artifactId>
>       </exclusion>
>    </exclusions>
> </dependency>
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>
>  <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
>    <version>${flink.version}</version>
>    <scope>provided</scope>
> </dependency>
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
>    <version>2.6.5-7.0</version>
>    <scope>provided</scope>
> </dependency>
>
>
>
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>