|
Hi,
通过以下API创建了一张临时表:
val oStream: DataStream[Order] = env.addSource(new BugSource)
tEnv.createTemporaryView("t_order", oStream, 'order_id, 'order_price,
'order_time)
然后又创建了新的catalog以及数据库,过程如下:
val catalog =
"""
|create catalog hive_catalog with (
| 'type'='iceberg',
| 'catalog-type'='hive',
| 'uri'='thrift://hostname:9083',
| 'clients'='5',
| 'property-version'='1',
| 'warehouse'='hdfs://hostname:8020/hive'
|)
|""".stripMargin
tEnv.executeSql(catalog)
tEnv.useCatalog("hive_catalog")
val database =
"""
|CREATE DATABASE iceberg_db
|""".stripMargin
tEnv.executeSql(database)
tEnv.useDatabase("iceberg_db")
val ice_order =
"""
|create table ice_t_order(
|order_id string,
|order_price decimal(10,2),
|order_time timestamp(3)
|)
|""".stripMargin
tEnv.executeSql(ice_order)
val query =
"""
|insert into ice_t_order
|select * from t_order
|""".stripMargin
tEnv.executeSql(query)
这样,我在执行时,抛出了找不到表t_order的异常,信息如下:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
't_order' not found
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
... 27 more
像这样的问题,我该怎么做才能将提前创建的临时表在新的catalog中使用?
望知道的大佬告知,感谢!
祝好!
|