This post was updated on .
CONTENTS DELETED
The author has deleted this message.
|
Administrator
|
Hi,
目前 Flink SQL 在插入数据到数据库时,要求 query 的 key 与结果表的 key 相同。这里 HBase 的 key 一直都是 rowkey,但是 query 的 key 丢失了(concat_ws 丢失了 key 属性),因此需要直接 group by concat_ws(..),才能获得 key 且对应上 HBase 的 rowkey。所以你的 query 需要改成这样: insert into resume01 select age_name,ROW(age,mobile) from ( select CONCAT_WS('_',age,name) as age_name,sum(cast(mobile as bigint)) as mobile from source_resume group by CONCAT_WS('_',age,name) ) as tt Best, Jark On Mon, 9 Mar 2020 at 21:03, [hidden email] <[hidden email]> wrote: > 各位好, > 最近在研究Flink Hbase连接器,测试实验是将聚合的数据写入到hbase报错。希望能得到各位的帮助。代码 如下: > /** > * @Author: ellis.guan > * @Description: HBase测试类 > * @Date: 2020/3/6 15:41 > * @Version: 1.0 > */ > public class HbaseTest { > private StreamExecutionEnvironment env; > private StreamTableEnvironment tableEnv; > > @Before > public void init(){ > env=StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > tableEnv = StreamTableEnvironment.create(env, settings); > tableEnv.sqlUpdate("create table resume01(\n" + > " `rowkey` string,sdp_columns_family ROW<age string,mobile > BIGINT> \n" + > // " `binfo` ROW<age string,mobile string,site string>,\n" + > // " edu ROW<university string>, \n" + > // " work ROW<company1 string> \n" + > ") with (" + > " 'connector.type' = 'hbase', " + > " 'connector.version' = '1.4.3', " + > " 'connector.table-name' = 'resume01'," + > " 'connector.zookeeper.quorum' = 'localhost:2181'," + > " 'connector.zookeeper.znode.parent' = '/hbase'" + > ")"); > } > @Test > public void testReadFromHBase() throws Exception { > // HBaseTableSource resume = new HBaseTableSource(); > Table table = tableEnv.sqlQuery("select * from resume"); > DataStream<Tuple2<Boolean, Row>> out = > tableEnv.toRetractStream(table, Row.class); > out.print(); > env.execute(); > } > > @Test > public void testWriterToHBase() throws Exception { > DataStream<Row> source = env.fromElements( > Row.of("ellis","2015-03-27","17352837822","changsha","hun > nan","shiji"), > Row.of("ellis","2015-03-28","17352837825","changsha1","hun > nan","shiji"), > > Row.of("ellis","2015-03-279","17352837826","changsha2","hun nan","shiji")); > > tableEnv.createTemporaryView("source_resume",source,"name,age,mobile,site,university,company1"); > tableEnv.sqlUpdate("insert into resume01 select > CONCAT_WS('_',age,name),ROW(age,mobile) from " + > " (select name,age,sum(cast(mobile as bigint)) as mobile > from source_resume group by name,age ) as tt"); > env.execute(); > } > } > > 运行报错如下: > org.apache.flink.table.api.TableException: UpsertStreamTableSink requires > that Table has a full primary keys if it is updated. > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:59) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59) > at > org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98) > at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79) > at > org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87) > at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77) > at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42) > at > org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88) > at > org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51) > at > org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44) > at > org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27) > at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37) > at > org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42) > at org.junit.runner.JUnitCore.run(JUnitCore.java:130) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > > > > [hidden email] > |
Administrator
|
In reply to this post by guanxianchun
Yes, the age in ROW(...) expression is invalid, you should extract the name
from age_name. Try the following query: insert into resume01 select age_name,ROW(SUBSTR(age_name, 0, INSTR(age_name, '_') - 1),mobile) from ( select CONCAT_WS('_',age,name) as age_name,sum(cast(mobile as bigint)) as mobile from source_resume group by name,age ) as tt On Tue, 10 Mar 2020 at 10:46, <[hidden email]> wrote: > 您好,感谢您的回复,按照你写的sql 我运行了一下,报age列不存在表中: > String sql = "insert into resume01 \n" + > " select age_name,ROW(age,mobile)\n" + > " from (\n" + > " select CONCAT_WS('_',age,name) as > age_name,sum(cast(mobile as bigint)) as mobile \n" + > " from source_resume group by > CONCAT_WS('_',age,name) \n" + > " ) as tt "; > tableEnv.sqlUpdate(sql); > 运行后错误如下: > org.apache.flink.table.api.ValidationException: SQL validation failed. > From line 2, column 31 to line 2, column 33: Column 'age' not found in any > table > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:75) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59) > at > org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98) > at > org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79) > at > org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87) > at > org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77) > at > org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42) > at > org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88) > at > org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51) > at > org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44) > at > org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27) > at > org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37) > at > org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42) > at org.junit.runner.JUnitCore.run(JUnitCore.java:130) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line > 2, column 31 to line 2, column 33: Column 'age' not found in any table > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > > > String sql = "insert into resume01 \n" + > " select age_name,ROW(age,mobile)\n" + > " from (\n" + > " select CONCAT_WS('_',age,name) as > age_name,sum(cast(mobile as bigint)) as mobile,age as age \n" + > " from source_resume group by > CONCAT_WS('_',age,name),age \n" + > " ) as tt "; > tableEnv.sqlUpdate(sql); > > 加上age后提交原来的错误: > org.apache.flink.table.api.TableException: UpsertStreamTableSink requires > that Table has a full primary keys if it is updated. > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at > com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:75) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59) > at > org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98) > at > org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79) > at > org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87) > at > org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77) > at > org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42) > at > org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88) > at > org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51) > at > org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44) > at > org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27) > at > org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37) > at > org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42) > at org.junit.runner.JUnitCore.run(JUnitCore.java:130) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > > _____________________________________ > Sent from http://apache-flink.147419.n8.nabble.com > > |
Free forum by Nabble | Edit this page |