Login  Register

本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

classic Classic list List threaded Threaded
4 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

shao.hongxiao
17 posts
一下是我的程序
sql
val hbaseTable =
      """
        |CREATE TABLE pvuv_sink(
        |    user_id varchar,
        |    f ROW<item_id varchar,category_id varchar,cnt BIGINT>
        |) WITH (
        |    'connector.type' = 'hbase',
        |    'connector.version' = '1.4.3',
        |    'connector.table-name' = 'test_shx',
        |    'connector.zookeeper.quorum' = 'docker-hbase:2181',
        |    'connector.zookeeper.znode.parent' = '/hbase'
        |)
      """.stripMargin.toString


 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment    bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 bsEnv.setParallelism(1)
    val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
bsTableEnv.sqlUpdate(hbaseTable)
bsTableEnv.execute("SQL Job")


报错
job graph 阶段
HBaseRowInputFormat.java
this.conf = {Configuration@4841} "Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml"
quietmode = true
allowNullValueProperties = false
resources = {ArrayList@4859}  size = 2
finalParameters = {Collections$SetFromMap@4860}  size = 0
loadDefaults = true
updatingResource = {ConcurrentHashMap@4861}  size = 343
properties = {Properties@4862}  size = 343
overlay = {Properties@4863}  size = 2
classLoader = {Launcher$AppClassLoader@4864}

Executor job 阶段  InstantiationUtil.java    readObjectFromConfig
userCodeObject = {HBaseRowInputFormat@13658}
tableName = "test_shx"
schema = {HBaseTableSchema@13660}
conf = null
readHelper = null
endReached = false
table = null
scan = null
resultScanner = null
currentRow = null
scannedRows = 0
runtimeContext = null


恳请各位大神相帮



| |
邵红晓
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: 本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

Leonard Xu
339 posts
Hi, hongxiao

我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?

Best,
Leonard Xu
[1] https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33 <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33>

> 在 2020年5月22日,11:48,shao.hongxiao <[hidden email]> 写道:
>
> 一下是我的程序
> sql
> val hbaseTable =
>       """
>         |CREATE TABLE pvuv_sink(
>         |    user_id varchar,
>         |    f ROW<item_id varchar,category_id varchar,cnt BIGINT>
>         |) WITH (
>         |    'connector.type' = 'hbase',
>         |    'connector.version' = '1.4.3',
>         |    'connector.table-name' = 'test_shx',
>         |    'connector.zookeeper.quorum' = 'docker-hbase:2181',
>         |    'connector.zookeeper.znode.parent' = '/hbase'
>         |)
>       """.stripMargin.toString
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment    bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>  bsEnv.setParallelism(1)
>     val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> bsTableEnv.sqlUpdate(hbaseTable)
> bsTableEnv.execute("SQL Job")
>
> 报错
> job graph 阶段
> HBaseRowInputFormat.java
> this.conf = {Configuration@4841} "Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml"
> quietmode = true
> allowNullValueProperties = false
> resources = {ArrayList@4859}  size = 2
> finalParameters = {Collections$SetFromMap@4860}  size = 0
> loadDefaults = true
> updatingResource = {ConcurrentHashMap@4861}  size = 343
> properties = {Properties@4862}  size = 343
> overlay = {Properties@4863}  size = 2
> classLoader = {Launcher$AppClassLoader@4864}
>
> Executor job 阶段  InstantiationUtil.java    readObjectFromConfig
> userCodeObject = {HBaseRowInputFormat@13658}
> tableName = "test_shx"
> schema = {HBaseTableSchema@13660}
> conf = null
> readHelper = null
> endReached = false
> table = null
> scan = null
> resultScanner = null
> currentRow = null
> scannedRows = 0
> runtimeContext = null
>
>
> 恳请各位大神相帮
>
>
> 邵红晓
> 邮箱:[hidden email]
>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B5%E7%BA%A2%E6%99%93&uid=17611022895%40163.com&iconUrl=http%3A%2F%2Fmail-online.nosdn.127.net%2Fde509f973023059c028c85e411fc61eb.jpg&items=%5B%22%22%2C%22%E9%82%AE%E7%AE%B1%EF%BC%9A17611022895%40163.com%22%2C%22%22%2C%22%22%2C%22%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制

Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

回复:本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

shao.hongxiao
17 posts
感谢,经过测试发现,hbase可以写成功,就是无法读取,查看源码发现
hbaseRowinputformatImpl大概是这个类下面,有一句话
private transient Configuration conf;

这样conf就不会被序列化,也就是反序列化的时候为null了的原因了,具体你可以再复现一下


| |
邵红晓
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年05月23日 00:19,Leonard Xu 写道:
Hi, hongxiao

我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?

Best,
Leonard Xu
[1] https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33 <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33>

> 在 2020年5月22日,11:48,shao.hongxiao <[hidden email]> 写道:
>
> 一下是我的程序
> sql
> val hbaseTable =
>       """
>         |CREATE TABLE pvuv_sink(
>         |    user_id varchar,
>         |    f ROW<item_id varchar,category_id varchar,cnt BIGINT>
>         |) WITH (
>         |    'connector.type' = 'hbase',
>         |    'connector.version' = '1.4.3',
>         |    'connector.table-name' = 'test_shx',
>         |    'connector.zookeeper.quorum' = 'docker-hbase:2181',
>         |    'connector.zookeeper.znode.parent' = '/hbase'
>         |)
>       """.stripMargin.toString
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment    bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>  bsEnv.setParallelism(1)
>     val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> bsTableEnv.sqlUpdate(hbaseTable)
> bsTableEnv.execute("SQL Job")
>
> 报错
> job graph 阶段
> HBaseRowInputFormat.java
> this.conf = {Configuration@4841} "Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml"
> quietmode = true
> allowNullValueProperties = false
> resources = {ArrayList@4859}  size = 2
> finalParameters = {Collections$SetFromMap@4860}  size = 0
> loadDefaults = true
> updatingResource = {ConcurrentHashMap@4861}  size = 343
> properties = {Properties@4862}  size = 343
> overlay = {Properties@4863}  size = 2
> classLoader = {Launcher$AppClassLoader@4864}
>
> Executor job 阶段  InstantiationUtil.java    readObjectFromConfig
> userCodeObject = {HBaseRowInputFormat@13658}
> tableName = "test_shx"
> schema = {HBaseTableSchema@13660}
> conf = null
> readHelper = null
> endReached = false
> table = null
> scan = null
> resultScanner = null
> currentRow = null
> scannedRows = 0
> runtimeContext = null
>
>
> 恳请各位大神相帮
>
>    
> 邵红晓
> 邮箱:[hidden email]
>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B5%E7%BA%A2%E6%99%93&uid=17611022895%40163.com&iconUrl=http%3A%2F%2Fmail-online.nosdn.127.net%2Fde509f973023059c028c85e411fc61eb.jpg&items=%5B%22%22%2C%22%E9%82%AE%E7%AE%B1%EF%BC%9A17611022895%40163.com%22%2C%22%22%2C%22%22%2C%22%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制

Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: 本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

Leonard Xu
339 posts
Hi, 邵红晓

我跟进了相关代码,确实是HBaseRowInputFormat实现有问题[1],目前的代码实现HbaseTableSource只支持从classpath加载hbase-site.xml文件作为配置文件,DDL中的大多数配置项因为序列化原因无法传给InputFormat。建议你将配置写到hbase-site.xml并添加到classpath中,
if (this.conf == null) {
   this.conf = HBaseConfiguration.create();
}
InuptFormat中有段failback逻辑回去发现和加载classpath中的配置,本地IDE中也可以添加hbase-site.xml到classpath进行验证和调试,可以绕过现在的问题。

比较晚发现这个问题一是使用hbas作为source表的用户比较少,主要把hbase用作维表和结果表,维表走HBaseLookupFunction,结果表走HBaseUpsertSinkFunction,这两个的实现都是ok的,二是Hbase的ITCase实现比较特殊,没能覆盖DDL的测试,生产环境一般都是走环境变量配置,添加到classpath,我看这段代码历史还挺久的。

总之,是个好问题,我建了issue,会跟进修复。


Best,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-17932 <https://issues.apache.org/jira/browse/FLINK-17932>


> 在 2020年5月23日,08:01,shao.hongxiao <[hidden email]> 写道:
>
> 感谢,经过测试发现,hbase可以写成功,就是无法读取,查看源码发现
> hbaseRowinputformatImpl大概是这个类下面,有一句话
> private transient Configuration conf;
>
> 这样conf就不会被序列化,也就是反序列化的时候为null了的原因了,具体你可以再复现一下
>
>
> | |
> 邵红晓
> |
> |
> 邮箱:[hidden email]
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年05月23日 00:19,Leonard Xu 写道:
> Hi, hongxiao
>
> 我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?
>
> Best,
> Leonard Xu
> [1] https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33 <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33>
>
>> 在 2020年5月22日,11:48,shao.hongxiao <[hidden email]> 写道:
>>
>> 一下是我的程序
>> sql
>> val hbaseTable =
>>      """
>>        |CREATE TABLE pvuv_sink(
>>        |    user_id varchar,
>>        |    f ROW<item_id varchar,category_id varchar,cnt BIGINT>
>>        |) WITH (
>>        |    'connector.type' = 'hbase',
>>        |    'connector.version' = '1.4.3',
>>        |    'connector.table-name' = 'test_shx',
>>        |    'connector.zookeeper.quorum' = 'docker-hbase:2181',
>>        |    'connector.zookeeper.znode.parent' = '/hbase'
>>        |)
>>      """.stripMargin.toString
>>
>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment    bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>> bsEnv.setParallelism(1)
>>    val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>    val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>> bsTableEnv.sqlUpdate(hbaseTable)
>> bsTableEnv.execute("SQL Job")
>>
>> 报错
>> job graph 阶段
>> HBaseRowInputFormat.java
>> this.conf = {Configuration@4841} "Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml"
>> quietmode = true
>> allowNullValueProperties = false
>> resources = {ArrayList@4859}  size = 2
>> finalParameters = {Collections$SetFromMap@4860}  size = 0
>> loadDefaults = true
>> updatingResource = {ConcurrentHashMap@4861}  size = 343
>> properties = {Properties@4862}  size = 343
>> overlay = {Properties@4863}  size = 2
>> classLoader = {Launcher$AppClassLoader@4864}
>>
>> Executor job 阶段  InstantiationUtil.java    readObjectFromConfig
>> userCodeObject = {HBaseRowInputFormat@13658}
>> tableName = "test_shx"
>> schema = {HBaseTableSchema@13660}
>> conf = null
>> readHelper = null
>> endReached = false
>> table = null
>> scan = null
>> resultScanner = null
>> currentRow = null
>> scannedRows = 0
>> runtimeContext = null
>>
>>
>> 恳请各位大神相帮
>>
>>
>> 邵红晓
>> 邮箱:[hidden email]
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B5%E7%BA%A2%E6%99%93&uid=17611022895%40163.com&iconUrl=http%3A%2F%2Fmail-online.nosdn.127.net%2Fde509f973023059c028c85e411fc61eb.jpg&items=%5B%22%22%2C%22%E9%82%AE%E7%AE%B1%EF%BC%9A17611022895%40163.com%22%2C%22%22%2C%22%22%2C%22%22%5D>
>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>