flink sql 怎么将ROW<`age` INT>转换成INT啊
streamTableEnv.sqlUpdate( """ | |insert into user_age |SELECT rowkey, cast(cf as int) as age |FROM | users | |""".stripMargin)这样尝试报错了 |
不能直接cast,ROW类型是一个复合类型,要获取其中的某个字段,可以用`.`来获取。
比如你现在这个场景,就是 SELECT rowkey, cf.age FROM users Zhou Zach <[hidden email]> 于2020年6月16日周二 下午6:59写道: > flink sql 怎么将ROW<`age` INT>转换成INT啊 > > > streamTableEnv.sqlUpdate( > """ > | > |insert into user_age > |SELECT rowkey, cast(cf as int) as age > |FROM > | users > | > |""".stripMargin)这样尝试报错了 |
2020-06-16 21:01:09,756 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka version: unknown
2020-06-16 21:01:09,757 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: unknown 2020-06-16 21:01:09,758 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-7, groupId=null] Subscribed to partition(s): user_behavior-0 2020-06-16 21:01:09,765 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - Cluster ID: cAT_xBISQNWghT9kR5UuIw 2020-06-16 21:01:09,766 WARN org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 2020-06-16 21:01:09,766 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka version: unknown 2020-06-16 21:01:09,767 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: unknown 2020-06-16 21:01:09,768 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-7, groupId=null] Resetting offset for partition user_behavior-0 to offset 43545. 2020-06-16 21:01:35,904 INFO org.apache.flink.addons.hbase.HBaseLookupFunction - start close ... 2020-06-16 21:01:35,906 INFO org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient - Close zookeeper connection 0x72d39885 to cdh1:2181,cdh2:2181,cdh3:2181 2020-06-16 21:01:35,908 INFO org.apache.flink.addons.hbase.HBaseLookupFunction - end close. 2020-06-16 21:01:35,908 INFO org.apache.zookeeper.ZooKeeper - Session: 0x172b776fac80ae4 closed 2020-06-16 21:01:35,909 INFO org.apache.zookeeper.ClientCnxn - EventThread shut down 2020-06-16 21:01:35,911 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(uid, phoneType, clickCount, time) -> SourceConversion(table=[default_catalog.default_database.user_behavior, source: [KafkaTableSource(uid, phoneType, clickCount, time)]], fields=[uid, phoneType, clickCount, time]) -> Calc(select=[uid, time]) -> LookupJoin(table=[HBaseTableSource[schema=[rowkey, cf], projectFields=null]], joinType=[InnerJoin], async=[false], lookup=[rowkey=uid], select=[uid, time, rowkey, cf]) -> Calc(select=[CAST(time) AS time, cf.age AS age]) -> SinkConversionToTuple2 -> Sink: JDBCUpsertTableSink(time, age) (1/2) (e45989f173dc35aefc52413349db7f30) switched from RUNNING to FAILED. java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 2 at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838) at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004) at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980) at org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:55) at org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158) at org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78) at LookupFunction$12.flatMap(Unknown Source) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at StreamExecCalc$7.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SourceConversion$6.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Query: val hConf = HBaseConfiguration.create() hConf.set(HConstants.ZOOKEEPER_QUORUM, "cdh1:2181,cdh2:2181,cdh3:2181") hConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase") val users = new HBaseTableSource(hConf, "user_hbase5") users.setRowKey("rowkey", classOf[String]) // currency as the primary key users.addColumn("cf", "age", classOf[Integer]) streamTableEnv.registerTableSource("users", users) streamTableEnv.sqlUpdate( """ | |insert into time_age |SELECT | cast(b.`time` as string) as `time`, u.cf.age |FROM | (select * , PROCTIME() AS proctime from user_behavior) AS b | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u | ON b.uid = u.rowkey | |""".stripMargin) offset (0) + length (4) exceed the capacity of the array: 2 这个错误提示 是不是 hbase取出来的int类型, 用users.addColumn("cf", "age", classOf[Integer]) 来转换是不是不对, 怎么把int转换成Integer呢或者把Integer转换成int ------------------ 原始邮件 ------------------ 发件人: "libenchao"<[hidden email]>; 发送时间: 2020年6月16日(星期二) 晚上7:56 收件人: "user-zh"<[hidden email]>; 主题: Re: Re: flink sql read hbase sink mysql data type not match 不能直接cast,ROW类型是一个复合类型,要获取其中的某个字段,可以用`.`来获取。 比如你现在这个场景,就是 SELECT rowkey, cf.age FROM users Zhou Zach <[hidden email]> 于2020年6月16日周二 下午6:59写道: > flink sql 怎么将ROW<`age` INT&gt;转换成INT啊 > > > streamTableEnv.sqlUpdate( > """ > | > |insert into user_age > |SELECT rowkey, cast(cf as int) as age > |FROM > | users > | > |""".stripMargin)这样尝试报错了 |
Free forum by Nabble | Edit this page |