HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

xiao cai
Hi All:
使用flink-sql写入hbase sink时报错:
UpsertStreamTableSink requires that Table has a full primary keys if it is updated.


我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
sql如下:
create table user_click_source(
`id` bigint,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint,
`catalog_id` int,
`device_id` int,
`user_id` int,
`proc_time` timestamp(3)
PRIMARY KEY (id) NOT ENFORCED
)with(
'connector.type' = 'kafka',
……
)
;
create table dim_user(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>,
ts bigint
)with(
'connector.type'='hbase',
……
)
;


create table dim_device(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>
)with(
'connector.type'='hbase',
……
)
;


create table dim_catalog(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>
)with(
'connector.type'='hbase',
……
)
;
create table hbase_full_user_click_case1_sink(
`rowkey` bigint,
cf ROW<
`click_id` bigint,
`click_name` varchar,
`click_partition` int,
`click_event_time` bigint,
`click_write_time` bigint,
`click_snapshot_time` bigint,
`click_max_snapshot_time` bigint,
`catalog_id` int,
`catalog_name` varchar,
`catalog_partition` int,
`catalog_event_time` bigint,
`catalog_write_time` bigint,
`catalog_snapshot_time` bigint,
`catalog_max_snapshot_time` bigint,
`device_id` int,
`device_name` varchar,
`device_partition` int,
`device_event_time` bigint,
`device_write_time` bigint,
`device_snapshot_time` bigint,
`device_max_snapshot_time` bigint,
`user_id` int,
`user_name` varchar,
`user_partition` int,
`user_event_time` bigint,
`user_write_time` bigint,
`user_snapshot_time` bigint,
`user_max_snapshot_time` bigint
>,
PRIMARY KEY (rowkey) NOT ENFORCED
)with(
'connector.type'='hbase',
……
)
;
insert into hbase_full_user_click_case1_sink
select
`click_id`,
ROW(
`click_id`,
`click_name`,
`click_partition`,
`click_event_time`,
`click_write_time`,
`click_snapshot_time`,
`click_max_snapshot_time`,
`catalog_id`,
`catalog_name`,
`catalog_partition`,
`catalog_event_time`,
`catalog_write_time`,
`catalog_snapshot_time`,
`catalog_max_snapshot_time`,
`device_id`,
`device_name`,
`device_partition`,
`device_event_time`,
`device_write_time`,
`device_snapshot_time`,
`device_max_snapshot_time`,
`user_id`,
`user_name`,
`user_partition`,
`user_event_time`,
`user_write_time`,
`user_snapshot_time`,
`user_max_snapshot_time`
)
from (select
click.id as `click_id`,
click.name as `click_name`,
click.kafka_partition as `click_partition`,
click.event_time as `click_event_time`,
click.write_time as `click_write_time`,
click.snapshot_time as `click_snapshot_time`,
click.max_snapshot_time as `click_max_snapshot_time`,
cat.cf.id as `catalog_id`,
cat.cf.name as `catalog_name`,
cat.cf.kafka_partition as `catalog_partition`,
cat.cf.event_time as `catalog_event_time`,
cat.cf.write_time as `catalog_write_time`,
cat.cf.snapshot_time as `catalog_snapshot_time`,
cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
dev.cf.id as `device_id`,
dev.cf.name as `device_name`,
dev.cf.kafka_partition as `device_partition`,
dev.cf.event_time as `device_event_time`,
dev.cf.write_time as `device_write_time`,
dev.cf.snapshot_time as `device_snapshot_time`,
dev.cf.max_snapshot_time as `device_max_snapshot_time`,
u.cf.id as `user_id`,
u.cf.name as `user_name`,
u.cf.kafka_partition as `user_partition`,
u.cf.event_time as `user_event_time`,
u.cf.write_time as `user_write_time`,
u.cf.snapshot_time as `user_snapshot_time`,
u.cf.max_snapshot_time as `user_max_snapshot_time`


from (select
id,
`name`,
`kafka_partition`,
`event_time`,
`write_time`,
`snapshot_time`,
`max_snapshot_time`,
cast(catalog_id as varchar) as catalog_key,
cast(device_id as varchar) as device_key,
cast(user_id as varchar) as user_key,
`catalog_id`,
`device_id`,
`user_id`,
`proc_time`,
`event_time`,
FROM user_click_source
GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND),
`id`,
`name`,
`kafka_partition`,
`event_time`,
`write_time`,
`snapshot_time`,
`max_snapshot_time`,
`catalog_id`,
`device_id`,
`user_id`,
`proc_time`) click


left join dim_catalog cat on click.catalog_key = cat.rowkey
left join dim_device dev on click.device_key = dev.rowkey
left join dim_user u on click.user_key = u.rowkey and click.event_time = u.ts
) t
Reply | Threaded
Open this post in threaded view
|

Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

Jark
Administrator
 PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query
推导不出 PK 也不会报错了。
 see more:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html


Best,
Jark


On Thu, 13 Aug 2020 at 14:27, xiao cai <[hidden email]> wrote:

> Hi All:
> 使用flink-sql写入hbase sink时报错:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.
>
>
> 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
> kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
> sql如下:
> create table user_click_source(
> `id` bigint,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint,
> `catalog_id` int,
> `device_id` int,
> `user_id` int,
> `proc_time` timestamp(3)
> PRIMARY KEY (id) NOT ENFORCED
> )with(
> 'connector.type' = 'kafka',
> ……
> )
> ;
> create table dim_user(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >,
> ts bigint
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_device(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_catalog(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> create table hbase_full_user_click_case1_sink(
> `rowkey` bigint,
> cf ROW<
> `click_id` bigint,
> `click_name` varchar,
> `click_partition` int,
> `click_event_time` bigint,
> `click_write_time` bigint,
> `click_snapshot_time` bigint,
> `click_max_snapshot_time` bigint,
> `catalog_id` int,
> `catalog_name` varchar,
> `catalog_partition` int,
> `catalog_event_time` bigint,
> `catalog_write_time` bigint,
> `catalog_snapshot_time` bigint,
> `catalog_max_snapshot_time` bigint,
> `device_id` int,
> `device_name` varchar,
> `device_partition` int,
> `device_event_time` bigint,
> `device_write_time` bigint,
> `device_snapshot_time` bigint,
> `device_max_snapshot_time` bigint,
> `user_id` int,
> `user_name` varchar,
> `user_partition` int,
> `user_event_time` bigint,
> `user_write_time` bigint,
> `user_snapshot_time` bigint,
> `user_max_snapshot_time` bigint
> >,
> PRIMARY KEY (rowkey) NOT ENFORCED
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> insert into hbase_full_user_click_case1_sink
> select
> `click_id`,
> ROW(
> `click_id`,
> `click_name`,
> `click_partition`,
> `click_event_time`,
> `click_write_time`,
> `click_snapshot_time`,
> `click_max_snapshot_time`,
> `catalog_id`,
> `catalog_name`,
> `catalog_partition`,
> `catalog_event_time`,
> `catalog_write_time`,
> `catalog_snapshot_time`,
> `catalog_max_snapshot_time`,
> `device_id`,
> `device_name`,
> `device_partition`,
> `device_event_time`,
> `device_write_time`,
> `device_snapshot_time`,
> `device_max_snapshot_time`,
> `user_id`,
> `user_name`,
> `user_partition`,
> `user_event_time`,
> `user_write_time`,
> `user_snapshot_time`,
> `user_max_snapshot_time`
> )
> from (select
> click.id as `click_id`,
> click.name as `click_name`,
> click.kafka_partition as `click_partition`,
> click.event_time as `click_event_time`,
> click.write_time as `click_write_time`,
> click.snapshot_time as `click_snapshot_time`,
> click.max_snapshot_time as `click_max_snapshot_time`,
> cat.cf.id as `catalog_id`,
> cat.cf.name as `catalog_name`,
> cat.cf.kafka_partition as `catalog_partition`,
> cat.cf.event_time as `catalog_event_time`,
> cat.cf.write_time as `catalog_write_time`,
> cat.cf.snapshot_time as `catalog_snapshot_time`,
> cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
> dev.cf.id as `device_id`,
> dev.cf.name as `device_name`,
> dev.cf.kafka_partition as `device_partition`,
> dev.cf.event_time as `device_event_time`,
> dev.cf.write_time as `device_write_time`,
> dev.cf.snapshot_time as `device_snapshot_time`,
> dev.cf.max_snapshot_time as `device_max_snapshot_time`,
> u.cf.id as `user_id`,
> u.cf.name as `user_name`,
> u.cf.kafka_partition as `user_partition`,
> u.cf.event_time as `user_event_time`,
> u.cf.write_time as `user_write_time`,
> u.cf.snapshot_time as `user_snapshot_time`,
> u.cf.max_snapshot_time as `user_max_snapshot_time`
>
>
> from (select
> id,
> `name`,
> `kafka_partition`,
> `event_time`,
> `write_time`,
> `snapshot_time`,
> `max_snapshot_time`,
> cast(catalog_id as varchar) as catalog_key,
> cast(device_id as varchar) as device_key,
> cast(user_id as varchar) as user_key,
> `catalog_id`,
> `device_id`,
> `user_id`,
> `proc_time`,
> `event_time`,
> FROM user_click_source
> GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND),
> `id`,
> `name`,
> `kafka_partition`,
> `event_time`,
> `write_time`,
> `snapshot_time`,
> `max_snapshot_time`,
> `catalog_id`,
> `device_id`,
> `user_id`,
> `proc_time`) click
>
>
> left join dim_catalog cat on click.catalog_key = cat.rowkey
> left join dim_device dev on click.device_key = dev.rowkey
> left join dim_user u on click.user_key = u.rowkey and click.event_time =
> u.ts
> ) t
Reply | Threaded
Open this post in threaded view
|

答复: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

xiao cai
Hi Jark:
        感谢回答,我发现是我join的时候,是想将hbase作为维表使用的,但是我遗漏了for system_time as of语句,添加后就不会再报这个错了。
另外有个问题想请教:1.11中新版hbase connector只是指with中指定version为1.4所创建的表吗,我发现使用1.4.3的版本,也是可以正常使用的。是不是说明pk在1.4和1.4.3两个版本上都是生效的?
再次感谢。


Best
Xiao Cai

发送自 Windows 10 版邮件应用

发件人: Jark Wu
发送时间: 2020年8月14日 23:23
收件人: user-zh
主题: Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

 PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query
推导不出 PK 也不会报错了。
 see more:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html


Best,
Jark


On Thu, 13 Aug 2020 at 14:27, xiao cai <[hidden email]> wrote:

> Hi All:
> 使用flink-sql写入hbase sink时报错:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.
>
>
> 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
> kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
> sql如下:
> create table user_click_source(
> `id` bigint,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint,
> `catalog_id` int,
> `device_id` int,
> `user_id` int,
> `proc_time` timestamp(3)
> PRIMARY KEY (id) NOT ENFORCED
> )with(
> 'connector.type' = 'kafka',
> ……
> )
> ;
> create table dim_user(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >,
> ts bigint
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_device(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_catalog(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> create table hbase_full_user_click_case1_sink(
> `rowkey` bigint,
> cf ROW<
> `click_id` bigint,
> `click_name` varchar,
> `click_partition` int,
> `click_event_time` bigint,
> `click_write_time` bigint,
> `click_snapshot_time` bigint,
> `click_max_snapshot_time` bigint,
> `catalog_id` int,
> `catalog_name` varchar,
> `catalog_partition` int,
> `catalog_event_time` bigint,
> `catalog_write_time` bigint,
> `catalog_snapshot_time` bigint,
> `catalog_max_snapshot_time` bigint,
> `device_id` int,
> `device_name` varchar,
> `device_partition` int,
> `device_event_time` bigint,
> `device_write_time` bigint,
> `device_snapshot_time` bigint,
> `device_max_snapshot_time` bigint,
> `user_id` int,
> `user_name` varchar,
> `user_partition` int,
> `user_event_time` bigint,
> `user_write_time` bigint,
> `user_snapshot_time` bigint,
> `user_max_snapshot_time` bigint
> >,
> PRIMARY KEY (rowkey) NOT ENFORCED
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> insert into hbase_full_user_click_case1_sink
> select
> `click_id`,
> ROW(
> `click_id`,
> `click_name`,
> `click_partition`,
> `click_event_time`,
> `click_write_time`,
> `click_snapshot_time`,
> `click_max_snapshot_time`,
> `catalog_id`,
> `catalog_name`,
> `catalog_partition`,
> `catalog_event_time`,
> `catalog_write_time`,
> `catalog_snapshot_time`,
> `catalog_max_snapshot_time`,
> `device_id`,
> `device_name`,
> `device_partition`,
> `device_event_time`,
> `device_write_time`,
> `device_snapshot_time`,
> `device_max_snapshot_time`,
> `user_id`,
> `user_name`,
> `user_partition`,
> `user_event_time`,
> `user_write_time`,
> `user_snapshot_time`,
> `user_max_snapshot_time`
> )
> from (select
> click.id as `click_id`,
> click.name as `click_name`,
> click.kafka_partition as `click_partition`,
> click.event_time as `click_event_time`,
> click.write_time as `click_write_time`,
> click.snapshot_time as `click_snapshot_time`,
> click.max_snapshot_time as `click_max_snapshot_time`,
> cat.cf.id as `catalog_id`,
> cat.cf.name as `catalog_name`,
> cat.cf.kafka_partition as `catalog_partition`,
> cat.cf.event_time as `catalog_event_time`,
> cat.cf.write_time as `catalog_write_time`,
> cat.cf.snapshot_time as `catalog_snapshot_time`,
> cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
> dev.cf.id as `device_id`,
> dev.cf.name as `device_name`,
> dev.cf.kafka_partition as `device_partition`,
> dev.cf.event_time as `device_event_time`,
> dev.cf.write_time as `device_write_time`,
> dev.cf.snapshot_time as `device_snapshot_time`,
> dev.cf.max_snapshot_time as `device_max_snapshot_time`,
> u.cf.id as `user_id`,
> u.cf.name as `user_name`,
> u.cf.kafka_partition as `user_partition`,
> u.cf.event_time as `user_event_time`,
> u.cf.write_time as `user_write_time`,
> u.cf.snapshot_time as `user_snapshot_time`,
> u.cf.max_snapshot_time as `user_max_snapshot_time`
>
>
> from (select
> id,
> `name`,
> `kafka_partition`,
> `event_time`,
> `write_time`,
> `snapshot_time`,
> `max_snapshot_time`,
> cast(catalog_id as varchar) as catalog_key,
> cast(device_id as varchar) as device_key,
> cast(user_id as varchar) as user_key,
> `catalog_id`,
> `device_id`,
> `user_id`,
> `proc_time`,
> `event_time`,
> FROM user_click_source
> GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND),
> `id`,
> `name`,
> `kafka_partition`,
> `event_time`,
> `write_time`,
> `snapshot_time`,
> `max_snapshot_time`,
> `catalog_id`,
> `device_id`,
> `user_id`,
> `proc_time`) click
>
>
> left join dim_catalog cat on click.catalog_key = cat.rowkey
> left join dim_device dev on click.device_key = dev.rowkey
> left join dim_user u on click.user_key = u.rowkey and click.event_time =
> u.ts
> ) t


Reply | Threaded
Open this post in threaded view
|

Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

Jark
Administrator
我上面说的“新版 hbase connector”,指的是 Flink 仓库中实现的新版 sink 连接器,对于 HBase server
1.4和1.4.3都是能用的。

On Sat, 15 Aug 2020 at 00:05, xiao cai <[hidden email]> wrote:

> Hi Jark:
>         感谢回答,我发现是我join的时候,是想将hbase作为维表使用的,但是我遗漏了for system_time as
> of语句,添加后就不会再报这个错了。
> 另外有个问题想请教:1.11中新版hbase
> connector只是指with中指定version为1.4所创建的表吗,我发现使用1.4.3的版本,也是可以正常使用的。是不是说明pk在1.4和1.4.3两个版本上都是生效的?
> 再次感谢。
>
>
> Best
> Xiao Cai
>
> 发送自 Windows 10 版邮件应用
>
> 发件人: Jark Wu
> 发送时间: 2020年8月14日 23:23
> 收件人: user-zh
> 主题: Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full
> primary keys
>
>  PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query
> 推导不出 PK 也不会报错了。
>  see more:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
>
>
> Best,
> Jark
>
>
> On Thu, 13 Aug 2020 at 14:27, xiao cai <[hidden email]> wrote:
>
> > Hi All:
> > 使用flink-sql写入hbase sink时报错:
> > UpsertStreamTableSink requires that Table has a full primary keys if it
> is
> > updated.
> >
> >
> > 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
> > kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
> > sql如下:
> > create table user_click_source(
> > `id` bigint,
> > `name` varchar,
> > `kafka_partition` int,
> > `event_time` bigint,
> > `write_time` bigint,
> > `snapshot_time` bigint,
> > `max_snapshot_time` bigint,
> > `catalog_id` int,
> > `device_id` int,
> > `user_id` int,
> > `proc_time` timestamp(3)
> > PRIMARY KEY (id) NOT ENFORCED
> > )with(
> > 'connector.type' = 'kafka',
> > ……
> > )
> > ;
> > create table dim_user(
> > `rowkey` varchar,
> > cf ROW<
> > `id` int,
> > `name` varchar,
> > `kafka_partition` int,
> > `event_time` bigint,
> > `write_time` bigint,
> > `snapshot_time` bigint,
> > `max_snapshot_time` bigint
> > >,
> > ts bigint
> > )with(
> > 'connector.type'='hbase',
> > ……
> > )
> > ;
> >
> >
> > create table dim_device(
> > `rowkey` varchar,
> > cf ROW<
> > `id` int,
> > `name` varchar,
> > `kafka_partition` int,
> > `event_time` bigint,
> > `write_time` bigint,
> > `snapshot_time` bigint,
> > `max_snapshot_time` bigint
> > >
> > )with(
> > 'connector.type'='hbase',
> > ……
> > )
> > ;
> >
> >
> > create table dim_catalog(
> > `rowkey` varchar,
> > cf ROW<
> > `id` int,
> > `name` varchar,
> > `kafka_partition` int,
> > `event_time` bigint,
> > `write_time` bigint,
> > `snapshot_time` bigint,
> > `max_snapshot_time` bigint
> > >
> > )with(
> > 'connector.type'='hbase',
> > ……
> > )
> > ;
> > create table hbase_full_user_click_case1_sink(
> > `rowkey` bigint,
> > cf ROW<
> > `click_id` bigint,
> > `click_name` varchar,
> > `click_partition` int,
> > `click_event_time` bigint,
> > `click_write_time` bigint,
> > `click_snapshot_time` bigint,
> > `click_max_snapshot_time` bigint,
> > `catalog_id` int,
> > `catalog_name` varchar,
> > `catalog_partition` int,
> > `catalog_event_time` bigint,
> > `catalog_write_time` bigint,
> > `catalog_snapshot_time` bigint,
> > `catalog_max_snapshot_time` bigint,
> > `device_id` int,
> > `device_name` varchar,
> > `device_partition` int,
> > `device_event_time` bigint,
> > `device_write_time` bigint,
> > `device_snapshot_time` bigint,
> > `device_max_snapshot_time` bigint,
> > `user_id` int,
> > `user_name` varchar,
> > `user_partition` int,
> > `user_event_time` bigint,
> > `user_write_time` bigint,
> > `user_snapshot_time` bigint,
> > `user_max_snapshot_time` bigint
> > >,
> > PRIMARY KEY (rowkey) NOT ENFORCED
> > )with(
> > 'connector.type'='hbase',
> > ……
> > )
> > ;
> > insert into hbase_full_user_click_case1_sink
> > select
> > `click_id`,
> > ROW(
> > `click_id`,
> > `click_name`,
> > `click_partition`,
> > `click_event_time`,
> > `click_write_time`,
> > `click_snapshot_time`,
> > `click_max_snapshot_time`,
> > `catalog_id`,
> > `catalog_name`,
> > `catalog_partition`,
> > `catalog_event_time`,
> > `catalog_write_time`,
> > `catalog_snapshot_time`,
> > `catalog_max_snapshot_time`,
> > `device_id`,
> > `device_name`,
> > `device_partition`,
> > `device_event_time`,
> > `device_write_time`,
> > `device_snapshot_time`,
> > `device_max_snapshot_time`,
> > `user_id`,
> > `user_name`,
> > `user_partition`,
> > `user_event_time`,
> > `user_write_time`,
> > `user_snapshot_time`,
> > `user_max_snapshot_time`
> > )
> > from (select
> > click.id as `click_id`,
> > click.name as `click_name`,
> > click.kafka_partition as `click_partition`,
> > click.event_time as `click_event_time`,
> > click.write_time as `click_write_time`,
> > click.snapshot_time as `click_snapshot_time`,
> > click.max_snapshot_time as `click_max_snapshot_time`,
> > cat.cf.id as `catalog_id`,
> > cat.cf.name as `catalog_name`,
> > cat.cf.kafka_partition as `catalog_partition`,
> > cat.cf.event_time as `catalog_event_time`,
> > cat.cf.write_time as `catalog_write_time`,
> > cat.cf.snapshot_time as `catalog_snapshot_time`,
> > cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
> > dev.cf.id as `device_id`,
> > dev.cf.name as `device_name`,
> > dev.cf.kafka_partition as `device_partition`,
> > dev.cf.event_time as `device_event_time`,
> > dev.cf.write_time as `device_write_time`,
> > dev.cf.snapshot_time as `device_snapshot_time`,
> > dev.cf.max_snapshot_time as `device_max_snapshot_time`,
> > u.cf.id as `user_id`,
> > u.cf.name as `user_name`,
> > u.cf.kafka_partition as `user_partition`,
> > u.cf.event_time as `user_event_time`,
> > u.cf.write_time as `user_write_time`,
> > u.cf.snapshot_time as `user_snapshot_time`,
> > u.cf.max_snapshot_time as `user_max_snapshot_time`
> >
> >
> > from (select
> > id,
> > `name`,
> > `kafka_partition`,
> > `event_time`,
> > `write_time`,
> > `snapshot_time`,
> > `max_snapshot_time`,
> > cast(catalog_id as varchar) as catalog_key,
> > cast(device_id as varchar) as device_key,
> > cast(user_id as varchar) as user_key,
> > `catalog_id`,
> > `device_id`,
> > `user_id`,
> > `proc_time`,
> > `event_time`,
> > FROM user_click_source
> > GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND),
> > `id`,
> > `name`,
> > `kafka_partition`,
> > `event_time`,
> > `write_time`,
> > `snapshot_time`,
> > `max_snapshot_time`,
> > `catalog_id`,
> > `device_id`,
> > `user_id`,
> > `proc_time`) click
> >
> >
> > left join dim_catalog cat on click.catalog_key = cat.rowkey
> > left join dim_device dev on click.device_key = dev.rowkey
> > left join dim_user u on click.user_key = u.rowkey and click.event_time =
> > u.ts
> > ) t
>
>
>