flink-sql 1.11整合hbase的查询性能问题

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink-sql 1.11整合hbase的查询性能问题

大罗

你好,我想提两个问题,关于flink-sql整合hbase,问题列表如下:
问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。

问题二:flink-sql
查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT



两个问题的具体情景如下:

我所使用的版本如下:



Hadoop 3.0.0+cdh6.3.2

HDFS 3.0.0+cdh6.3.2

HBase 2.1.0+cdh6.3.2

Hive 2.1.1+cdh6.3.2

Flink 1.11.1



我在HBase里有两个表,分别为't1'和'econ_run_data_minutes',其中,
表't1'的行数为2行,而'econ_run_data_minutes'的行数超过1亿行。



我在flink-sql里,例用hive catalog定义'myHbaseT1'映射't1',
定义'myHbaseMin'映射'econ_run_data_minutes',如下:



CREATE TABLE myHbaseT1 (

  rowkey string,

  f1 row (val VARCHAR, dqf VARCHAR),

  f2 row (val VARCHAR, dqf VARCHAR)

) WITH (

 'connector' = 'hbase-1.4',

 'table-name' = 't1',

 'zookeeper.quorum' =
'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181'

);





CREATE TABLE myHbaseMin (

  rowkey string,

  m1 row (val VARCHAR, dqf VARCHAR, ts TIMESTAMP(3)),

 PRIMARY KEY (rowkey) NOT ENFORCED

) WITH (

 'connector' = 'hbase-1.4',

 'table-name' = 'econ_run_data_minutes',

 'zookeeper.quorum' =
'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181'

);



然手,在flink-sql client里执行查询语句,"select * from myHbaseT1",因为数据量只有两行,所以很快返回结果,如下:

Flink SQL> select * from myHbaseT1 ;

+-----+----------------------+----------------------+----------------------+

| +/- |               rowkey |                   f1 |                   f2 |

+-----+----------------------+----------------------+----------------------+

|   + | IDIN200F.d1.P-159... |            null,null |                100,0 |

|   + |                 row1 |                200,0 |            null,null |

+-----+----------------------+----------------------+----------------------+

Received a total of 2 rows



Flink SQL>

以上测试,证明我的flink-sql环境通过hive catalog整合hbase查询是没有问题的!



接着,我在flink-sql client里执行查询语句,

"select * from myhbasemin where rowkey >= 'IDIN200F.d1.P-1598493600000' and
rowkey <= 'IDIN200F.d1.P-1598497200000'",

这个查询结果其实只是返回不到100行的数据,但却要耗时半个小时左右,原因是因为flink做了整表扫描,执行计划如下以及核心日志如下:



Source: TableSourceScan(table=[[myhive, dw, myhbasemin]], fields=[rowkey,
m1]) -> Calc(select=[rowkey, m1], where=[((rowkey >=
_UTF-16LE'IDIN200F.d1.P-1598493600000') AND (rowkey <=
_UTF-16LE'IDIN200F.d1.P-1598497200000'))]) -> SinkConversionToTuple2 ->
Sink: SQL Client Stream Collect Sink





2020-08-28 11:20:24,501 INFO
org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] -
opening split
(this=org.apache.flink.connector.hbase.source.HBaseRowDataInputFormat@5c382732)[1|[dev-hadoop-node-c:16020]|BSBMS.d1.cu3.061c.U-15935022|BSBMS.d1.cu3.147c.U-159092]

2020-08-28 11:20:49,086 INFO
org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] -
Closing split (scanned 4410553 rows)



我设想中的情景,flink-sql应该抓住这个rowkey,在hbase里面使用StartRow,
EndRow做精确扫描,而不是整表扫描后,在flink的task manager里面做filter !

所以,问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。



在问题一的基础思考之上,我想是否可以通过hive的external table整合hbase,然后在flink-sql里查询hive的externa
table,实现快速精确查询,实验过程如下:



首先,在hive里定义external table整合hbase:



CREATE EXTERNAL TABLE `hive_hbase_t1`(

  `m_key` string COMMENT '',

  `f1_val` string COMMENT '',

  `f1_dqf` int COMMENT '',

  `f2_val` string COMMENT '',

  `f2_dqf` int COMMENT '',

  `ts` timestamp COMMENT '')

ROW FORMAT SERDE

  'org.apache.hadoop.hive.hbase.HBaseSerDe'

STORED BY

  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

  WITH SERDEPROPERTIES (

   'hbase.columns.mapping'=':key,f1:val,f1:dqf,f2:val,f2:dqf, :timestamp',

   'serialization.format'='1')



TBLPROPERTIES (

  'hbase.table.name'='t1'

);



然后,在hive里查询,"select * from hive_hbase_t1":



hive> select * from hive_hbase_t1 ;

IDIN200F.d1.P-1593484146001 NULL NULL 100 0 2020-06-30 10:56:07.983

row1 200 0 NULL NULL 2020-06-30 10:54:56.558

hive>

以上测试,证明我的环境,hive是可以通过external table整合hbase进行查询。



接着,我尝试在flink-sql里,查询,"select * from hive_hbase_t1":

Flink SQL>  select * from hive_hbase_t1;

Caused by: java.lang.NullPointerException

        at java.lang.Class.forName0(Native Method)

        at java.lang.Class.forName(Class.java:348)

        at
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305)

        ... 24 more



End of exception on server side>]



Flink SQL>



经过调试,我发现,当org.apache.flink.connectors.hive.read.HiveTableInputFormat进行序列化分区的时候,找不到StorageDescriptor的inputFormat,

报错的代码在方法的org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits:

format = (InputFormat)Class.forName(sd.getInputFormat(), true,
Thread.currentThread().getContextClassLoader()).newInstance();



而实际上,我是有引入"org.apache.hive:hive-hbase-handler:2.1.1",

但很明显,flink-sql并没有利用'hive_hbase_t1'里的定义"STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler' ",

从而找不到正确的 INPUTFORMAT
'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat' 和 OUTPUTFORMAT
'org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat'



所以,我的问题二:flink-sql
查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT






--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink-sql 1.11整合hbase的查询性能问题

Leonard Xu
Hi, 大罗

> 在 2020年8月28日,11:57,大罗 <[hidden email]> 写道:
>
>
> 你好,我想提两个问题,关于flink-sql整合hbase,问题列表如下:
> 问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。
>
> 问题二:flink-sql
> 查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT
>

问题一: HbaseTableSource 目前没有支持 SupportsFilterPushDown ,所以你写的filter条件没有下推到source中,做的是全表扫描。

问题二: Flink 支持 读取Hive的external表的,如果不能读就是有bug,可以贴下详细的异常栈吗?我可以看下了开个issue

祝好
Leonard



>
>
> 两个问题的具体情景如下:
>
> 我所使用的版本如下:
>
>
>
> Hadoop 3.0.0+cdh6.3.2
>
> HDFS 3.0.0+cdh6.3.2
>
> HBase 2.1.0+cdh6.3.2
>
> Hive 2.1.1+cdh6.3.2
>
> Flink 1.11.1
>
>
>
> 我在HBase里有两个表,分别为't1'和'econ_run_data_minutes',其中,
> 表't1'的行数为2行,而'econ_run_data_minutes'的行数超过1亿行。
>
>
>
> 我在flink-sql里,例用hive catalog定义'myHbaseT1'映射't1',
> 定义'myHbaseMin'映射'econ_run_data_minutes',如下:
>
>
>
> CREATE TABLE myHbaseT1 (
>
>  rowkey string,
>
>  f1 row (val VARCHAR, dqf VARCHAR),
>
>  f2 row (val VARCHAR, dqf VARCHAR)
>
> ) WITH (
>
> 'connector' = 'hbase-1.4',
>
> 'table-name' = 't1',
>
> 'zookeeper.quorum' =
> 'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181'
>
> );
>
>
>
>
>
> CREATE TABLE myHbaseMin (
>
>  rowkey string,
>
>  m1 row (val VARCHAR, dqf VARCHAR, ts TIMESTAMP(3)),
>
> PRIMARY KEY (rowkey) NOT ENFORCED
>
> ) WITH (
>
> 'connector' = 'hbase-1.4',
>
> 'table-name' = 'econ_run_data_minutes',
>
> 'zookeeper.quorum' =
> 'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181'
>
> );
>
>
>
> 然手,在flink-sql client里执行查询语句,"select * from myHbaseT1",因为数据量只有两行,所以很快返回结果,如下:
>
> Flink SQL> select * from myHbaseT1 ;
>
> +-----+----------------------+----------------------+----------------------+
>
> | +/- |               rowkey |                   f1 |                   f2 |
>
> +-----+----------------------+----------------------+----------------------+
>
> |   + | IDIN200F.d1.P-159... |            null,null |                100,0 |
>
> |   + |                 row1 |                200,0 |            null,null |
>
> +-----+----------------------+----------------------+----------------------+
>
> Received a total of 2 rows
>
>
>
> Flink SQL>
>
> 以上测试,证明我的flink-sql环境通过hive catalog整合hbase查询是没有问题的!
>
>
>
> 接着,我在flink-sql client里执行查询语句,
>
> "select * from myhbasemin where rowkey >= 'IDIN200F.d1.P-1598493600000' and
> rowkey <= 'IDIN200F.d1.P-1598497200000'",
>
> 这个查询结果其实只是返回不到100行的数据,但却要耗时半个小时左右,原因是因为flink做了整表扫描,执行计划如下以及核心日志如下:
>
>
>
> Source: TableSourceScan(table=[[myhive, dw, myhbasemin]], fields=[rowkey,
> m1]) -> Calc(select=[rowkey, m1], where=[((rowkey >=
> _UTF-16LE'IDIN200F.d1.P-1598493600000') AND (rowkey <=
> _UTF-16LE'IDIN200F.d1.P-1598497200000'))]) -> SinkConversionToTuple2 ->
> Sink: SQL Client Stream Collect Sink
>
>
>
>
>
> 2020-08-28 11:20:24,501 INFO
> org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] -
> opening split
> (this=org.apache.flink.connector.hbase.source.HBaseRowDataInputFormat@5c382732)[1|[dev-hadoop-node-c:16020]|BSBMS.d1.cu3.061c.U-15935022|BSBMS.d1.cu3.147c.U-159092]
>
> 2020-08-28 11:20:49,086 INFO
> org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] -
> Closing split (scanned 4410553 rows)
>
>
>
> 我设想中的情景,flink-sql应该抓住这个rowkey,在hbase里面使用StartRow,
> EndRow做精确扫描,而不是整表扫描后,在flink的task manager里面做filter !
>
> 所以,问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。
>
>
>
> 在问题一的基础思考之上,我想是否可以通过hive的external table整合hbase,然后在flink-sql里查询hive的externa
> table,实现快速精确查询,实验过程如下:
>
>
>
> 首先,在hive里定义external table整合hbase:
>
>
>
> CREATE EXTERNAL TABLE `hive_hbase_t1`(
>
>  `m_key` string COMMENT '',
>
>  `f1_val` string COMMENT '',
>
>  `f1_dqf` int COMMENT '',
>
>  `f2_val` string COMMENT '',
>
>  `f2_dqf` int COMMENT '',
>
>  `ts` timestamp COMMENT '')
>
> ROW FORMAT SERDE
>
>  'org.apache.hadoop.hive.hbase.HBaseSerDe'
>
> STORED BY
>
>  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
>
>  WITH SERDEPROPERTIES (
>
>   'hbase.columns.mapping'=':key,f1:val,f1:dqf,f2:val,f2:dqf, :timestamp',
>
>   'serialization.format'='1')
>
>
>
> TBLPROPERTIES (
>
>  'hbase.table.name'='t1'
>
> );
>
>
>
> 然后,在hive里查询,"select * from hive_hbase_t1":
>
>
>
> hive> select * from hive_hbase_t1 ;
>
> IDIN200F.d1.P-1593484146001 NULL NULL 100 0 2020-06-30 10:56:07.983
>
> row1 200 0 NULL NULL 2020-06-30 10:54:56.558
>
> hive>
>
> 以上测试,证明我的环境,hive是可以通过external table整合hbase进行查询。
>
>
>
> 接着,我尝试在flink-sql里,查询,"select * from hive_hbase_t1":
>
> Flink SQL>  select * from hive_hbase_t1;
>
> Caused by: java.lang.NullPointerException
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305)
>
> ... 24 more
>
>
>
> End of exception on server side>]
>
>
>
> Flink SQL>
>
>
>
> 经过调试,我发现,当org.apache.flink.connectors.hive.read.HiveTableInputFormat进行序列化分区的时候,找不到StorageDescriptor的inputFormat,
>
> 报错的代码在方法的org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits:
>
> format = (InputFormat)Class.forName(sd.getInputFormat(), true,
> Thread.currentThread().getContextClassLoader()).newInstance();
>
>
>
> 而实际上,我是有引入"org.apache.hive:hive-hbase-handler:2.1.1",
>
> 但很明显,flink-sql并没有利用'hive_hbase_t1'里的定义"STORED BY
> 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' ",
>
> 从而找不到正确的 INPUTFORMAT
> 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat' 和 OUTPUTFORMAT
> 'org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat'
>
>
>
> 所以,我的问题二:flink-sql
> 查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: flink-sql 1.11整合hbase的查询性能问题

大罗
Hi Leonard,

我想强调,如果我在hive里定义一个external tabled读取指定的hdfs location,比如
"hive_external_table", 如下:

CREATE EXTERNAL TABLE `hive_external_table`(
  `sid` int,
  `sname` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'field.delim'=',')
LOCATION
  'hdfs://nameservice1:8020/opt/user/hive/warehouse/external_table/s'

然后,我在flink-sql里,的确是可以查询的:
Flink SQL> select * from hive_external_table ;
+-----+-------------+----------------------+
| +/- |         sid |                sname |
+-----+-------------+----------------------+
|   + |           2 |               monica |
|   + |           1 |                daluo |
+-----+-------------+----------------------+
Received a total of 2 rows

Flink SQL>

现在是flink-sql不能使用在hive里定义的hbase external table。

另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown.

关于"select * from hive_hbase_t1"的异常日志如下。


Flink SQL> select * from hive_hbase_t1;
2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf                        
[] - HiveConf of name hive.vectorized.use.checked.expressions does not exist
2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf                        
[] - HiveConf of name hive.strict.checks.no.partition.filter does not exist
2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf                        
[] - HiveConf of name hive.strict.checks.orderby.no.limit does not exist
2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf                        
[] - HiveConf of name hive.vectorized.input.format.excludes does not exist
2020-08-28 13:20:19,986 WARN  org.apache.hadoop.hive.conf.HiveConf                        
[] - HiveConf of name hive.strict.checks.bucketing does not exist
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
job.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
        at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
        at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
        at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        ... 6 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits
caused an error: Unable to instantiate the hadoop input format
        at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.&lt;init>(ExecutionJobVertex.java:272)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
        at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
        at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
        at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
        at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
        ... 7 more
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to
instantiate the hadoop input format
        at
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:307)
        at
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:282)
        at
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:66)
        at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:258)
        ... 21 more
Caused by: java.lang.NullPointerException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305)
        ... 24 more

End of exception on server side>]

Flink SQL>



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink-sql 1.11整合hbase的查询性能问题

Leonard Xu
Hi

> 另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown.
我搜了下社区还没相关的issue,如果是强需求你可以去社区建个issue[1],让社区支持下
第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc Rui Li 确认下

祝好
Leonard
[1] https://issues.apache.org/jira/projects/FLINK/summary <https://issues.apache.org/jira/projects/FLINK/summary>

>
> 关于"select * from hive_hbase_t1"的异常日志如下。
>
>
> Flink SQL> select * from hive_hbase_t1;
> 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf                        
> [] - HiveConf of name hive.vectorized.use.checked.expressions does not exist
> 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf                        
> [] - HiveConf of name hive.strict.checks.no.partition.filter does not exist
> 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf                        
> [] - HiveConf of name hive.strict.checks.orderby.no.limit does not exist
> 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf                        
> [] - HiveConf of name hive.vectorized.input.format.excludes does not exist
> 2020-08-28 13:20:19,986 WARN  org.apache.hadoop.hive.conf.HiveConf                        
> [] - HiveConf of name hive.strict.checks.bucketing does not exist
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate JobManager.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ... 6 more
> Caused by: org.apache.flink.runtime.JobException: Creating the input splits
> caused an error: Unable to instantiate the hadoop input format
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.&lt;init>(ExecutionJobVertex.java:272)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
> at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
> ... 7 more
> Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to
> instantiate the hadoop input format
> at
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:307)
> at
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:282)
> at
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:66)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:258)
> ... 21 more
> Caused by: java.lang.NullPointerException
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305)
> ... 24 more
>
> End of exception on server side>]
>
> Flink SQL>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: flink-sql 1.11整合hbase的查询性能问题

大罗
请问,如何在社区建立issue "HbaseTableSource 支持 SupportsFilterPushDown",有guideline吗。

另外,"第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc
Rui Li 确认下",你说的"Rui Li",请问,我要怎么抄送,可以给他的邮件吗,还是你已经抄送给他。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink-sql 1.11整合hbase的查询性能问题

Rui Li
In reply to this post by Leonard Xu
Hello,

目前hive connector没有支持hive的storage handler表 [1],也就是说通过STORED
BY定义的表现在是不支持的。普通的external表是支持的。

[1]
https://cwiki.apache.org/confluence/display/Hive/StorageHandlers#StorageHandlers-DDL

On Fri, Aug 28, 2020 at 2:43 PM Leonard Xu <[hidden email]> wrote:

> Hi
>
> > 另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown.
> 我搜了下社区还没相关的issue,如果是强需求你可以去社区建个issue[1],让社区支持下
> 第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc
> Rui Li 确认下
>
> 祝好
> Leonard
> [1] https://issues.apache.org/jira/projects/FLINK/summary <
> https://issues.apache.org/jira/projects/FLINK/summary>
> >
> > 关于"select * from hive_hbase_t1"的异常日志如下。
> >
> >
> > Flink SQL> select * from hive_hbase_t1;
> > 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.vectorized.use.checked.expressions does not
> exist
> > 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.strict.checks.no.partition.filter does not
> exist
> > 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.strict.checks.orderby.no.limit does not exist
> > 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.vectorized.input.format.excludes does not
> exist
> > 2020-08-28 13:20:19,986 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.strict.checks.bucketing does not exist
> > [ERROR] Could not execute SQL statement. Reason:
> > org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> > error., <Exception on server side:
> > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> > job.
> >       at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
> >       at
> >
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> >       at
> >
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> >       at
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> >       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >       at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >       at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >       at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >       at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >       at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not
> > instantiate JobManager.
> >       at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> >       at
> >
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> >       ... 6 more
> > Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits
> > caused an error: Unable to instantiate the hadoop input format
> >       at
> >
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.&lt;init>(ExecutionJobVertex.java:272)
> >       at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
> >       at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
> >       at
> >
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
> >       at
> >
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
> >       at
> >
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
> >       at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
> >       at
> >
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> >       at
> >
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> >       at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
> >       at
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> >       at
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> >       at
> >
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
> >       at
> >
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> >       at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
> >       ... 7 more
> > Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to
> > instantiate the hadoop input format
> >       at
> >
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:307)
> >       at
> >
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:282)
> >       at
> >
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:66)
> >       at
> >
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:258)
> >       ... 21 more
> > Caused by: java.lang.NullPointerException
> >       at java.lang.Class.forName0(Native Method)
> >       at java.lang.Class.forName(Class.java:348)
> >       at
> >
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305)
> >       ... 24 more
> >
> > End of exception on server side>]
> >
> > Flink SQL>
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>

--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: flink-sql 1.11整合hbase的查询性能问题

大罗
Hi Rui Li,

谢谢你的回复。

另外,我在apache jira flink,发起了issue ” flink sql 1.11 HbaseTableSource Supports
FilterPushDown”

https://issues.apache.org/jira/browse/FLINK-19088

希望能够尽快发布,改善在flink sql 1.11和hbase的整合效率。谢谢!




--
Sent from: http://apache-flink.147419.n8.nabble.com/