你好,我想提两个问题,关于flink-sql整合hbase,问题列表如下: 问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。 问题二:flink-sql 查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT 两个问题的具体情景如下: 我所使用的版本如下: Hadoop 3.0.0+cdh6.3.2 HDFS 3.0.0+cdh6.3.2 HBase 2.1.0+cdh6.3.2 Hive 2.1.1+cdh6.3.2 Flink 1.11.1 我在HBase里有两个表,分别为't1'和'econ_run_data_minutes',其中, 表't1'的行数为2行,而'econ_run_data_minutes'的行数超过1亿行。 我在flink-sql里,例用hive catalog定义'myHbaseT1'映射't1', 定义'myHbaseMin'映射'econ_run_data_minutes',如下: CREATE TABLE myHbaseT1 ( rowkey string, f1 row (val VARCHAR, dqf VARCHAR), f2 row (val VARCHAR, dqf VARCHAR) ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 't1', 'zookeeper.quorum' = 'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181' ); CREATE TABLE myHbaseMin ( rowkey string, m1 row (val VARCHAR, dqf VARCHAR, ts TIMESTAMP(3)), PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 'econ_run_data_minutes', 'zookeeper.quorum' = 'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181' ); 然手,在flink-sql client里执行查询语句,"select * from myHbaseT1",因为数据量只有两行,所以很快返回结果,如下: Flink SQL> select * from myHbaseT1 ; +-----+----------------------+----------------------+----------------------+ | +/- | rowkey | f1 | f2 | +-----+----------------------+----------------------+----------------------+ | + | IDIN200F.d1.P-159... | null,null | 100,0 | | + | row1 | 200,0 | null,null | +-----+----------------------+----------------------+----------------------+ Received a total of 2 rows Flink SQL> 以上测试,证明我的flink-sql环境通过hive catalog整合hbase查询是没有问题的! 接着,我在flink-sql client里执行查询语句, "select * from myhbasemin where rowkey >= 'IDIN200F.d1.P-1598493600000' and rowkey <= 'IDIN200F.d1.P-1598497200000'", 这个查询结果其实只是返回不到100行的数据,但却要耗时半个小时左右,原因是因为flink做了整表扫描,执行计划如下以及核心日志如下: Source: TableSourceScan(table=[[myhive, dw, myhbasemin]], fields=[rowkey, m1]) -> Calc(select=[rowkey, m1], where=[((rowkey >= _UTF-16LE'IDIN200F.d1.P-1598493600000') AND (rowkey <= _UTF-16LE'IDIN200F.d1.P-1598497200000'))]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink 2020-08-28 11:20:24,501 INFO org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] - opening split (this=org.apache.flink.connector.hbase.source.HBaseRowDataInputFormat@5c382732)[1|[dev-hadoop-node-c:16020]|BSBMS.d1.cu3.061c.U-15935022|BSBMS.d1.cu3.147c.U-159092] 2020-08-28 11:20:49,086 INFO org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] - Closing split (scanned 4410553 rows) 我设想中的情景,flink-sql应该抓住这个rowkey,在hbase里面使用StartRow, EndRow做精确扫描,而不是整表扫描后,在flink的task manager里面做filter ! 所以,问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。 在问题一的基础思考之上,我想是否可以通过hive的external table整合hbase,然后在flink-sql里查询hive的externa table,实现快速精确查询,实验过程如下: 首先,在hive里定义external table整合hbase: CREATE EXTERNAL TABLE `hive_hbase_t1`( `m_key` string COMMENT '', `f1_val` string COMMENT '', `f1_dqf` int COMMENT '', `f2_val` string COMMENT '', `f2_dqf` int COMMENT '', `ts` timestamp COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe' STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ( 'hbase.columns.mapping'=':key,f1:val,f1:dqf,f2:val,f2:dqf, :timestamp', 'serialization.format'='1') TBLPROPERTIES ( 'hbase.table.name'='t1' ); 然后,在hive里查询,"select * from hive_hbase_t1": hive> select * from hive_hbase_t1 ; IDIN200F.d1.P-1593484146001 NULL NULL 100 0 2020-06-30 10:56:07.983 row1 200 0 NULL NULL 2020-06-30 10:54:56.558 hive> 以上测试,证明我的环境,hive是可以通过external table整合hbase进行查询。 接着,我尝试在flink-sql里,查询,"select * from hive_hbase_t1": Flink SQL> select * from hive_hbase_t1; Caused by: java.lang.NullPointerException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305) ... 24 more End of exception on server side>] Flink SQL> 经过调试,我发现,当org.apache.flink.connectors.hive.read.HiveTableInputFormat进行序列化分区的时候,找不到StorageDescriptor的inputFormat, 报错的代码在方法的org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits: format = (InputFormat)Class.forName(sd.getInputFormat(), true, Thread.currentThread().getContextClassLoader()).newInstance(); 而实际上,我是有引入"org.apache.hive:hive-hbase-handler:2.1.1", 但很明显,flink-sql并没有利用'hive_hbase_t1'里的定义"STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' ", 从而找不到正确的 INPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat' 和 OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat' 所以,我的问题二:flink-sql 查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi, 大罗
> 在 2020年8月28日,11:57,大罗 <[hidden email]> 写道: > > > 你好,我想提两个问题,关于flink-sql整合hbase,问题列表如下: > 问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。 > > 问题二:flink-sql > 查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT > 问题一: HbaseTableSource 目前没有支持 SupportsFilterPushDown ,所以你写的filter条件没有下推到source中,做的是全表扫描。 问题二: Flink 支持 读取Hive的external表的,如果不能读就是有bug,可以贴下详细的异常栈吗?我可以看下了开个issue 祝好 Leonard > > > 两个问题的具体情景如下: > > 我所使用的版本如下: > > > > Hadoop 3.0.0+cdh6.3.2 > > HDFS 3.0.0+cdh6.3.2 > > HBase 2.1.0+cdh6.3.2 > > Hive 2.1.1+cdh6.3.2 > > Flink 1.11.1 > > > > 我在HBase里有两个表,分别为't1'和'econ_run_data_minutes',其中, > 表't1'的行数为2行,而'econ_run_data_minutes'的行数超过1亿行。 > > > > 我在flink-sql里,例用hive catalog定义'myHbaseT1'映射't1', > 定义'myHbaseMin'映射'econ_run_data_minutes',如下: > > > > CREATE TABLE myHbaseT1 ( > > rowkey string, > > f1 row (val VARCHAR, dqf VARCHAR), > > f2 row (val VARCHAR, dqf VARCHAR) > > ) WITH ( > > 'connector' = 'hbase-1.4', > > 'table-name' = 't1', > > 'zookeeper.quorum' = > 'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181' > > ); > > > > > > CREATE TABLE myHbaseMin ( > > rowkey string, > > m1 row (val VARCHAR, dqf VARCHAR, ts TIMESTAMP(3)), > > PRIMARY KEY (rowkey) NOT ENFORCED > > ) WITH ( > > 'connector' = 'hbase-1.4', > > 'table-name' = 'econ_run_data_minutes', > > 'zookeeper.quorum' = > 'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181' > > ); > > > > 然手,在flink-sql client里执行查询语句,"select * from myHbaseT1",因为数据量只有两行,所以很快返回结果,如下: > > Flink SQL> select * from myHbaseT1 ; > > +-----+----------------------+----------------------+----------------------+ > > | +/- | rowkey | f1 | f2 | > > +-----+----------------------+----------------------+----------------------+ > > | + | IDIN200F.d1.P-159... | null,null | 100,0 | > > | + | row1 | 200,0 | null,null | > > +-----+----------------------+----------------------+----------------------+ > > Received a total of 2 rows > > > > Flink SQL> > > 以上测试,证明我的flink-sql环境通过hive catalog整合hbase查询是没有问题的! > > > > 接着,我在flink-sql client里执行查询语句, > > "select * from myhbasemin where rowkey >= 'IDIN200F.d1.P-1598493600000' and > rowkey <= 'IDIN200F.d1.P-1598497200000'", > > 这个查询结果其实只是返回不到100行的数据,但却要耗时半个小时左右,原因是因为flink做了整表扫描,执行计划如下以及核心日志如下: > > > > Source: TableSourceScan(table=[[myhive, dw, myhbasemin]], fields=[rowkey, > m1]) -> Calc(select=[rowkey, m1], where=[((rowkey >= > _UTF-16LE'IDIN200F.d1.P-1598493600000') AND (rowkey <= > _UTF-16LE'IDIN200F.d1.P-1598497200000'))]) -> SinkConversionToTuple2 -> > Sink: SQL Client Stream Collect Sink > > > > > > 2020-08-28 11:20:24,501 INFO > org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] - > opening split > (this=org.apache.flink.connector.hbase.source.HBaseRowDataInputFormat@5c382732)[1|[dev-hadoop-node-c:16020]|BSBMS.d1.cu3.061c.U-15935022|BSBMS.d1.cu3.147c.U-159092] > > 2020-08-28 11:20:49,086 INFO > org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] - > Closing split (scanned 4410553 rows) > > > > 我设想中的情景,flink-sql应该抓住这个rowkey,在hbase里面使用StartRow, > EndRow做精确扫描,而不是整表扫描后,在flink的task manager里面做filter ! > > 所以,问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。 > > > > 在问题一的基础思考之上,我想是否可以通过hive的external table整合hbase,然后在flink-sql里查询hive的externa > table,实现快速精确查询,实验过程如下: > > > > 首先,在hive里定义external table整合hbase: > > > > CREATE EXTERNAL TABLE `hive_hbase_t1`( > > `m_key` string COMMENT '', > > `f1_val` string COMMENT '', > > `f1_dqf` int COMMENT '', > > `f2_val` string COMMENT '', > > `f2_dqf` int COMMENT '', > > `ts` timestamp COMMENT '') > > ROW FORMAT SERDE > > 'org.apache.hadoop.hive.hbase.HBaseSerDe' > > STORED BY > > 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' > > WITH SERDEPROPERTIES ( > > 'hbase.columns.mapping'=':key,f1:val,f1:dqf,f2:val,f2:dqf, :timestamp', > > 'serialization.format'='1') > > > > TBLPROPERTIES ( > > 'hbase.table.name'='t1' > > ); > > > > 然后,在hive里查询,"select * from hive_hbase_t1": > > > > hive> select * from hive_hbase_t1 ; > > IDIN200F.d1.P-1593484146001 NULL NULL 100 0 2020-06-30 10:56:07.983 > > row1 200 0 NULL NULL 2020-06-30 10:54:56.558 > > hive> > > 以上测试,证明我的环境,hive是可以通过external table整合hbase进行查询。 > > > > 接着,我尝试在flink-sql里,查询,"select * from hive_hbase_t1": > > Flink SQL> select * from hive_hbase_t1; > > Caused by: java.lang.NullPointerException > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:348) > > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305) > > ... 24 more > > > > End of exception on server side>] > > > > Flink SQL> > > > > 经过调试,我发现,当org.apache.flink.connectors.hive.read.HiveTableInputFormat进行序列化分区的时候,找不到StorageDescriptor的inputFormat, > > 报错的代码在方法的org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits: > > format = (InputFormat)Class.forName(sd.getInputFormat(), true, > Thread.currentThread().getContextClassLoader()).newInstance(); > > > > 而实际上,我是有引入"org.apache.hive:hive-hbase-handler:2.1.1", > > 但很明显,flink-sql并没有利用'hive_hbase_t1'里的定义"STORED BY > 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' ", > > 从而找不到正确的 INPUTFORMAT > 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat' 和 OUTPUTFORMAT > 'org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat' > > > > 所以,我的问题二:flink-sql > 查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi Leonard,
我想强调,如果我在hive里定义一个external tabled读取指定的hdfs location,比如 "hive_external_table", 如下: CREATE EXTERNAL TABLE `hive_external_table`( `sid` int, `sname` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'field.delim'=',') LOCATION 'hdfs://nameservice1:8020/opt/user/hive/warehouse/external_table/s' 然后,我在flink-sql里,的确是可以查询的: Flink SQL> select * from hive_external_table ; +-----+-------------+----------------------+ | +/- | sid | sname | +-----+-------------+----------------------+ | + | 2 | monica | | + | 1 | daluo | +-----+-------------+----------------------+ Received a total of 2 rows Flink SQL> 现在是flink-sql不能使用在hive里定义的hbase external table。 另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown. 关于"select * from hive_hbase_t1"的异常日志如下。 Flink SQL> select * from hive_hbase_t1; 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf [] - HiveConf of name hive.vectorized.use.checked.expressions does not exist 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf [] - HiveConf of name hive.strict.checks.no.partition.filter does not exist 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf [] - HiveConf of name hive.strict.checks.orderby.no.limit does not exist 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf [] - HiveConf of name hive.vectorized.input.format.excludes does not exist 2020-08-28 13:20:19,986 WARN org.apache.hadoop.hive.conf.HiveConf [] - HiveConf of name hive.strict.checks.bucketing does not exist [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 6 more Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Unable to instantiate the hadoop input format at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:272) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388) ... 7 more Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the hadoop input format at org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:307) at org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:282) at org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:66) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:258) ... 21 more Caused by: java.lang.NullPointerException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305) ... 24 more End of exception on server side>] Flink SQL> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi
> 另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown. 我搜了下社区还没相关的issue,如果是强需求你可以去社区建个issue[1],让社区支持下 第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc Rui Li 确认下 祝好 Leonard [1] https://issues.apache.org/jira/projects/FLINK/summary <https://issues.apache.org/jira/projects/FLINK/summary> > > 关于"select * from hive_hbase_t1"的异常日志如下。 > > > Flink SQL> select * from hive_hbase_t1; > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > [] - HiveConf of name hive.vectorized.use.checked.expressions does not exist > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > [] - HiveConf of name hive.strict.checks.no.partition.filter does not exist > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > [] - HiveConf of name hive.strict.checks.orderby.no.limit does not exist > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > [] - HiveConf of name hive.vectorized.input.format.excludes does not exist > 2020-08-28 13:20:19,986 WARN org.apache.hadoop.hive.conf.HiveConf > [] - HiveConf of name hive.strict.checks.bucketing does not exist > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., <Exception on server side: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not > instantiate JobManager. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > ... 6 more > Caused by: org.apache.flink.runtime.JobException: Creating the input splits > caused an error: Unable to instantiate the hadoop input format > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:272) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119) > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) > at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388) > ... 7 more > Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to > instantiate the hadoop input format > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:307) > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:282) > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:66) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:258) > ... 21 more > Caused by: java.lang.NullPointerException > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305) > ... 24 more > > End of exception on server side>] > > Flink SQL> > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
请问,如何在社区建立issue "HbaseTableSource 支持 SupportsFilterPushDown",有guideline吗。
另外,"第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc Rui Li 确认下",你说的"Rui Li",请问,我要怎么抄送,可以给他的邮件吗,还是你已经抄送给他。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Leonard Xu
Hello,
目前hive connector没有支持hive的storage handler表 [1],也就是说通过STORED BY定义的表现在是不支持的。普通的external表是支持的。 [1] https://cwiki.apache.org/confluence/display/Hive/StorageHandlers#StorageHandlers-DDL On Fri, Aug 28, 2020 at 2:43 PM Leonard Xu <[hidden email]> wrote: > Hi > > > 另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown. > 我搜了下社区还没相关的issue,如果是强需求你可以去社区建个issue[1],让社区支持下 > 第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc > Rui Li 确认下 > > 祝好 > Leonard > [1] https://issues.apache.org/jira/projects/FLINK/summary < > https://issues.apache.org/jira/projects/FLINK/summary> > > > > 关于"select * from hive_hbase_t1"的异常日志如下。 > > > > > > Flink SQL> select * from hive_hbase_t1; > > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.vectorized.use.checked.expressions does not > exist > > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.strict.checks.no.partition.filter does not > exist > > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.strict.checks.orderby.no.limit does not exist > > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.vectorized.input.format.excludes does not > exist > > 2020-08-28 13:20:19,986 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.strict.checks.bucketing does not exist > > [ERROR] Could not execute SQL statement. Reason: > > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > > error., <Exception on server side: > > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > > job. > > at > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344) > > at > > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > > at > > > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > > at > > > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > > at > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not > > instantiate JobManager. > > at > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) > > at > > > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > > ... 6 more > > Caused by: org.apache.flink.runtime.JobException: Creating the input > splits > > caused an error: Unable to instantiate the hadoop input format > > at > > > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:272) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228) > > at > > > org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) > > at > > > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) > > at > > > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229) > > at > > > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119) > > at > > > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) > > at > > > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) > > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272) > > at > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > > at > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > > at > > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140) > > at > > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > > at > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388) > > ... 7 more > > Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to > > instantiate the hadoop input format > > at > > > org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:307) > > at > > > org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:282) > > at > > > org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:66) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:258) > > ... 21 more > > Caused by: java.lang.NullPointerException > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:348) > > at > > > org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305) > > ... 24 more > > > > End of exception on server side>] > > > > Flink SQL> > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > -- Best regards! Rui Li |
Hi Rui Li,
谢谢你的回复。 另外,我在apache jira flink,发起了issue ” flink sql 1.11 HbaseTableSource Supports FilterPushDown” https://issues.apache.org/jira/browse/FLINK-19088 希望能够尽快发布,改善在flink sql 1.11和hbase的整合效率。谢谢! -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |