Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
17 posts
|
一下是我的程序
sql val hbaseTable = """ |CREATE TABLE pvuv_sink( | user_id varchar, | f ROW<item_id varchar,category_id varchar,cnt BIGINT> |) WITH ( | 'connector.type' = 'hbase', | 'connector.version' = '1.4.3', | 'connector.table-name' = 'test_shx', | 'connector.zookeeper.quorum' = 'docker-hbase:2181', | 'connector.zookeeper.znode.parent' = '/hbase' |) """.stripMargin.toString val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) bsEnv.setParallelism(1) val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) bsTableEnv.sqlUpdate(hbaseTable) bsTableEnv.execute("SQL Job") 报错 job graph 阶段 HBaseRowInputFormat.java this.conf = {Configuration@4841} "Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml" quietmode = true allowNullValueProperties = false resources = {ArrayList@4859} size = 2 finalParameters = {Collections$SetFromMap@4860} size = 0 loadDefaults = true updatingResource = {ConcurrentHashMap@4861} size = 343 properties = {Properties@4862} size = 343 overlay = {Properties@4863} size = 2 classLoader = {Launcher$AppClassLoader@4864} Executor job 阶段 InstantiationUtil.java readObjectFromConfig userCodeObject = {HBaseRowInputFormat@13658} tableName = "test_shx" schema = {HBaseTableSchema@13660} conf = null readHelper = null endReached = false table = null scan = null resultScanner = null currentRow = null scannedRows = 0 runtimeContext = null 恳请各位大神相帮 | | 邵红晓 | | 邮箱:[hidden email] | 签名由网易邮箱大师定制 |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
339 posts
|
Hi, hongxiao
我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀? Best, Leonard Xu [1] https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33 <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33> > 在 2020年5月22日,11:48,shao.hongxiao <[hidden email]> 写道: > > 一下是我的程序 > sql > val hbaseTable = > """ > |CREATE TABLE pvuv_sink( > | user_id varchar, > | f ROW<item_id varchar,category_id varchar,cnt BIGINT> > |) WITH ( > | 'connector.type' = 'hbase', > | 'connector.version' = '1.4.3', > | 'connector.table-name' = 'test_shx', > | 'connector.zookeeper.quorum' = 'docker-hbase:2181', > | 'connector.zookeeper.znode.parent' = '/hbase' > |) > """.stripMargin.toString > > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > bsEnv.setParallelism(1) > val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > bsTableEnv.sqlUpdate(hbaseTable) > bsTableEnv.execute("SQL Job") > > 报错 > job graph 阶段 > HBaseRowInputFormat.java > this.conf = {Configuration@4841} "Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml" > quietmode = true > allowNullValueProperties = false > resources = {ArrayList@4859} size = 2 > finalParameters = {Collections$SetFromMap@4860} size = 0 > loadDefaults = true > updatingResource = {ConcurrentHashMap@4861} size = 343 > properties = {Properties@4862} size = 343 > overlay = {Properties@4863} size = 2 > classLoader = {Launcher$AppClassLoader@4864} > > Executor job 阶段 InstantiationUtil.java readObjectFromConfig > userCodeObject = {HBaseRowInputFormat@13658} > tableName = "test_shx" > schema = {HBaseTableSchema@13660} > conf = null > readHelper = null > endReached = false > table = null > scan = null > resultScanner = null > currentRow = null > scannedRows = 0 > runtimeContext = null > > > 恳请各位大神相帮 > > > 邵红晓 > 邮箱:[hidden email] > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B5%E7%BA%A2%E6%99%93&uid=17611022895%40163.com&iconUrl=http%3A%2F%2Fmail-online.nosdn.127.net%2Fde509f973023059c028c85e411fc61eb.jpg&items=%5B%22%22%2C%22%E9%82%AE%E7%AE%B1%EF%BC%9A17611022895%40163.com%22%2C%22%22%2C%22%22%2C%22%22%5D> > 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 ... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
17 posts
|
感谢,经过测试发现,hbase可以写成功,就是无法读取,查看源码发现
hbaseRowinputformatImpl大概是这个类下面,有一句话 private transient Configuration conf; 这样conf就不会被序列化,也就是反序列化的时候为null了的原因了,具体你可以再复现一下 | | 邵红晓 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2020年05月23日 00:19,Leonard Xu 写道: Hi, hongxiao 我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀? Best, Leonard Xu [1] https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33 <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33> > 在 2020年5月22日,11:48,shao.hongxiao <[hidden email]> 写道: > > 一下是我的程序 > sql > val hbaseTable = > """ > |CREATE TABLE pvuv_sink( > | user_id varchar, > | f ROW<item_id varchar,category_id varchar,cnt BIGINT> > |) WITH ( > | 'connector.type' = 'hbase', > | 'connector.version' = '1.4.3', > | 'connector.table-name' = 'test_shx', > | 'connector.zookeeper.quorum' = 'docker-hbase:2181', > | 'connector.zookeeper.znode.parent' = '/hbase' > |) > """.stripMargin.toString > > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > bsEnv.setParallelism(1) > val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > bsTableEnv.sqlUpdate(hbaseTable) > bsTableEnv.execute("SQL Job") > > 报错 > job graph 阶段 > HBaseRowInputFormat.java > this.conf = {Configuration@4841} "Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml" > quietmode = true > allowNullValueProperties = false > resources = {ArrayList@4859} size = 2 > finalParameters = {Collections$SetFromMap@4860} size = 0 > loadDefaults = true > updatingResource = {ConcurrentHashMap@4861} size = 343 > properties = {Properties@4862} size = 343 > overlay = {Properties@4863} size = 2 > classLoader = {Launcher$AppClassLoader@4864} > > Executor job 阶段 InstantiationUtil.java readObjectFromConfig > userCodeObject = {HBaseRowInputFormat@13658} > tableName = "test_shx" > schema = {HBaseTableSchema@13660} > conf = null > readHelper = null > endReached = false > table = null > scan = null > resultScanner = null > currentRow = null > scannedRows = 0 > runtimeContext = null > > > 恳请各位大神相帮 > > > 邵红晓 > 邮箱:[hidden email] > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B5%E7%BA%A2%E6%99%93&uid=17611022895%40163.com&iconUrl=http%3A%2F%2Fmail-online.nosdn.127.net%2Fde509f973023059c028c85e411fc61eb.jpg&items=%5B%22%22%2C%22%E9%82%AE%E7%AE%B1%EF%BC%9A17611022895%40163.com%22%2C%22%22%2C%22%22%2C%22%22%5D> > 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 ... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
339 posts
|
Hi, 邵红晓
我跟进了相关代码,确实是HBaseRowInputFormat实现有问题[1],目前的代码实现HbaseTableSource只支持从classpath加载hbase-site.xml文件作为配置文件,DDL中的大多数配置项因为序列化原因无法传给InputFormat。建议你将配置写到hbase-site.xml并添加到classpath中, if (this.conf == null) { this.conf = HBaseConfiguration.create(); } InuptFormat中有段failback逻辑回去发现和加载classpath中的配置,本地IDE中也可以添加hbase-site.xml到classpath进行验证和调试,可以绕过现在的问题。 比较晚发现这个问题一是使用hbas作为source表的用户比较少,主要把hbase用作维表和结果表,维表走HBaseLookupFunction,结果表走HBaseUpsertSinkFunction,这两个的实现都是ok的,二是Hbase的ITCase实现比较特殊,没能覆盖DDL的测试,生产环境一般都是走环境变量配置,添加到classpath,我看这段代码历史还挺久的。 总之,是个好问题,我建了issue,会跟进修复。 Best, Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-17932 <https://issues.apache.org/jira/browse/FLINK-17932> > 在 2020年5月23日,08:01,shao.hongxiao <[hidden email]> 写道: > > 感谢,经过测试发现,hbase可以写成功,就是无法读取,查看源码发现 > hbaseRowinputformatImpl大概是这个类下面,有一句话 > private transient Configuration conf; > > 这样conf就不会被序列化,也就是反序列化的时候为null了的原因了,具体你可以再复现一下 > > > | | > 邵红晓 > | > | > 邮箱:[hidden email] > | > > 签名由 网易邮箱大师 定制 > > 在2020年05月23日 00:19,Leonard Xu 写道: > Hi, hongxiao > > 我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀? > > Best, > Leonard Xu > [1] https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33 <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33> > >> 在 2020年5月22日,11:48,shao.hongxiao <[hidden email]> 写道: >> >> 一下是我的程序 >> sql >> val hbaseTable = >> """ >> |CREATE TABLE pvuv_sink( >> | user_id varchar, >> | f ROW<item_id varchar,category_id varchar,cnt BIGINT> >> |) WITH ( >> | 'connector.type' = 'hbase', >> | 'connector.version' = '1.4.3', >> | 'connector.table-name' = 'test_shx', >> | 'connector.zookeeper.quorum' = 'docker-hbase:2181', >> | 'connector.zookeeper.znode.parent' = '/hbase' >> |) >> """.stripMargin.toString >> >> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) >> bsEnv.setParallelism(1) >> val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) >> bsTableEnv.sqlUpdate(hbaseTable) >> bsTableEnv.execute("SQL Job") >> >> 报错 >> job graph 阶段 >> HBaseRowInputFormat.java >> this.conf = {Configuration@4841} "Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml" >> quietmode = true >> allowNullValueProperties = false >> resources = {ArrayList@4859} size = 2 >> finalParameters = {Collections$SetFromMap@4860} size = 0 >> loadDefaults = true >> updatingResource = {ConcurrentHashMap@4861} size = 343 >> properties = {Properties@4862} size = 343 >> overlay = {Properties@4863} size = 2 >> classLoader = {Launcher$AppClassLoader@4864} >> >> Executor job 阶段 InstantiationUtil.java readObjectFromConfig >> userCodeObject = {HBaseRowInputFormat@13658} >> tableName = "test_shx" >> schema = {HBaseTableSchema@13660} >> conf = null >> readHelper = null >> endReached = false >> table = null >> scan = null >> resultScanner = null >> currentRow = null >> scannedRows = 0 >> runtimeContext = null >> >> >> 恳请各位大神相帮 >> >> >> 邵红晓 >> 邮箱:[hidden email] >> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B5%E7%BA%A2%E6%99%93&uid=17611022895%40163.com&iconUrl=http%3A%2F%2Fmail-online.nosdn.127.net%2Fde509f973023059c028c85e411fc61eb.jpg&items=%5B%22%22%2C%22%E9%82%AE%E7%AE%B1%EF%BC%9A17611022895%40163.com%22%2C%22%22%2C%22%22%2C%22%22%5D> >> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 > ... [show rest of quote] |
Free forum by Nabble | Edit this page |