发件人: [hidden email]发送时间: 2020-03-06 17:44收件人: [hidden email]主题: 如何通过Flink SQL注册Hbase源表 各位好, 最近公司想用flink来做流式计算,在研究Flink SQL读写HBase的时候遇到一些问题,希望能从您们那里得到帮忙。我在Hbase的默认命名空间里,创建了一个resume表,表结构如下:我的Flink测试代码如下: @Test public void testReadFromHBase() throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// HBaseTableSource resume = new HBaseTableSource(); tableEnv.sqlUpdate("create table resume(\n" + " binfo ROW<>,\n" + " edu ROW<>, \n" + " work ROW<> \n" + ") with (" + " 'connector.type' = 'hbase', " + " 'connector.version' = '1.4.3', " + " 'connector.table-name' = 'resume'," + " 'connector.zookeeper.quorum' = 'flink01.flink.net:2181,flink02.flink:2181,flink03.flink:2181'," + " 'connector.zookeeper.znode.parent' = '/hbase'" + ")"); Table table = tableEnv.sqlQuery("select * from resume"); DataStream<Tuple2<Boolean, Row>> out = tableEnv.toRetractStream(table, Row.class); out.print(); env.execute(); }运行报下面的错误:org.apache.flink.table.api.ValidationException: Could not map binfo column to the underlying physical type root. No such field. at org.apache.flink.table.utils.TypeMappingUtils.lambda$null$7(TypeMappingUtils.java:223) at java.util.OptionalInt.orElseThrow(OptionalInt.java:189) [hidden email]
@Test public void testReadFromHBase() throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// HBaseTableSource resume = new HBaseTableSource(); tableEnv.sqlUpdate("create table resume(\n" + " binfo ROW<>,\n" + " edu ROW<>, \n" + " work ROW<> \n" + ") with (" + " 'connector.type' = 'hbase', " + " 'connector.version' = '1.4.3', " + " 'connector.table-name' = 'resume'," + " 'connector.zookeeper.quorum' = 'flink01.flink.net:2181,flink02.flink:2181,flink03.flink:2181'," + " 'connector.zookeeper.znode.parent' = '/hbase'" + ")"); Table table = tableEnv.sqlQuery("select * from resume"); DataStream<Tuple2<Boolean, Row>> out = tableEnv.toRetractStream(table, Row.class); out.print(); env.execute(); }