Hi All:
使用flink-sql写入hbase sink时报错: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表 kafka source表与hbase 维表left join后的结果insert到hbase sink表中: sql如下: create table user_click_source( `id` bigint, `name` varchar, `kafka_partition` int, `event_time` bigint, `write_time` bigint, `snapshot_time` bigint, `max_snapshot_time` bigint, `catalog_id` int, `device_id` int, `user_id` int, `proc_time` timestamp(3) PRIMARY KEY (id) NOT ENFORCED )with( 'connector.type' = 'kafka', …… ) ; create table dim_user( `rowkey` varchar, cf ROW< `id` int, `name` varchar, `kafka_partition` int, `event_time` bigint, `write_time` bigint, `snapshot_time` bigint, `max_snapshot_time` bigint >, ts bigint )with( 'connector.type'='hbase', …… ) ; create table dim_device( `rowkey` varchar, cf ROW< `id` int, `name` varchar, `kafka_partition` int, `event_time` bigint, `write_time` bigint, `snapshot_time` bigint, `max_snapshot_time` bigint > )with( 'connector.type'='hbase', …… ) ; create table dim_catalog( `rowkey` varchar, cf ROW< `id` int, `name` varchar, `kafka_partition` int, `event_time` bigint, `write_time` bigint, `snapshot_time` bigint, `max_snapshot_time` bigint > )with( 'connector.type'='hbase', …… ) ; create table hbase_full_user_click_case1_sink( `rowkey` bigint, cf ROW< `click_id` bigint, `click_name` varchar, `click_partition` int, `click_event_time` bigint, `click_write_time` bigint, `click_snapshot_time` bigint, `click_max_snapshot_time` bigint, `catalog_id` int, `catalog_name` varchar, `catalog_partition` int, `catalog_event_time` bigint, `catalog_write_time` bigint, `catalog_snapshot_time` bigint, `catalog_max_snapshot_time` bigint, `device_id` int, `device_name` varchar, `device_partition` int, `device_event_time` bigint, `device_write_time` bigint, `device_snapshot_time` bigint, `device_max_snapshot_time` bigint, `user_id` int, `user_name` varchar, `user_partition` int, `user_event_time` bigint, `user_write_time` bigint, `user_snapshot_time` bigint, `user_max_snapshot_time` bigint >, PRIMARY KEY (rowkey) NOT ENFORCED )with( 'connector.type'='hbase', …… ) ; insert into hbase_full_user_click_case1_sink select `click_id`, ROW( `click_id`, `click_name`, `click_partition`, `click_event_time`, `click_write_time`, `click_snapshot_time`, `click_max_snapshot_time`, `catalog_id`, `catalog_name`, `catalog_partition`, `catalog_event_time`, `catalog_write_time`, `catalog_snapshot_time`, `catalog_max_snapshot_time`, `device_id`, `device_name`, `device_partition`, `device_event_time`, `device_write_time`, `device_snapshot_time`, `device_max_snapshot_time`, `user_id`, `user_name`, `user_partition`, `user_event_time`, `user_write_time`, `user_snapshot_time`, `user_max_snapshot_time` ) from (select click.id as `click_id`, click.name as `click_name`, click.kafka_partition as `click_partition`, click.event_time as `click_event_time`, click.write_time as `click_write_time`, click.snapshot_time as `click_snapshot_time`, click.max_snapshot_time as `click_max_snapshot_time`, cat.cf.id as `catalog_id`, cat.cf.name as `catalog_name`, cat.cf.kafka_partition as `catalog_partition`, cat.cf.event_time as `catalog_event_time`, cat.cf.write_time as `catalog_write_time`, cat.cf.snapshot_time as `catalog_snapshot_time`, cat.cf.max_snapshot_time as `catalog_max_snapshot_time`, dev.cf.id as `device_id`, dev.cf.name as `device_name`, dev.cf.kafka_partition as `device_partition`, dev.cf.event_time as `device_event_time`, dev.cf.write_time as `device_write_time`, dev.cf.snapshot_time as `device_snapshot_time`, dev.cf.max_snapshot_time as `device_max_snapshot_time`, u.cf.id as `user_id`, u.cf.name as `user_name`, u.cf.kafka_partition as `user_partition`, u.cf.event_time as `user_event_time`, u.cf.write_time as `user_write_time`, u.cf.snapshot_time as `user_snapshot_time`, u.cf.max_snapshot_time as `user_max_snapshot_time` from (select id, `name`, `kafka_partition`, `event_time`, `write_time`, `snapshot_time`, `max_snapshot_time`, cast(catalog_id as varchar) as catalog_key, cast(device_id as varchar) as device_key, cast(user_id as varchar) as user_key, `catalog_id`, `device_id`, `user_id`, `proc_time`, `event_time`, FROM user_click_source GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND), `id`, `name`, `kafka_partition`, `event_time`, `write_time`, `snapshot_time`, `max_snapshot_time`, `catalog_id`, `device_id`, `user_id`, `proc_time`) click left join dim_catalog cat on click.catalog_key = cat.rowkey left join dim_device dev on click.device_key = dev.rowkey left join dim_user u on click.user_key = u.rowkey and click.event_time = u.ts ) t |
Administrator
|
PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query
推导不出 PK 也不会报错了。 see more: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html Best, Jark On Thu, 13 Aug 2020 at 14:27, xiao cai <[hidden email]> wrote: > Hi All: > 使用flink-sql写入hbase sink时报错: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > > > 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表 > kafka source表与hbase 维表left join后的结果insert到hbase sink表中: > sql如下: > create table user_click_source( > `id` bigint, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint, > `catalog_id` int, > `device_id` int, > `user_id` int, > `proc_time` timestamp(3) > PRIMARY KEY (id) NOT ENFORCED > )with( > 'connector.type' = 'kafka', > …… > ) > ; > create table dim_user( > `rowkey` varchar, > cf ROW< > `id` int, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint > >, > ts bigint > )with( > 'connector.type'='hbase', > …… > ) > ; > > > create table dim_device( > `rowkey` varchar, > cf ROW< > `id` int, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint > > > )with( > 'connector.type'='hbase', > …… > ) > ; > > > create table dim_catalog( > `rowkey` varchar, > cf ROW< > `id` int, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint > > > )with( > 'connector.type'='hbase', > …… > ) > ; > create table hbase_full_user_click_case1_sink( > `rowkey` bigint, > cf ROW< > `click_id` bigint, > `click_name` varchar, > `click_partition` int, > `click_event_time` bigint, > `click_write_time` bigint, > `click_snapshot_time` bigint, > `click_max_snapshot_time` bigint, > `catalog_id` int, > `catalog_name` varchar, > `catalog_partition` int, > `catalog_event_time` bigint, > `catalog_write_time` bigint, > `catalog_snapshot_time` bigint, > `catalog_max_snapshot_time` bigint, > `device_id` int, > `device_name` varchar, > `device_partition` int, > `device_event_time` bigint, > `device_write_time` bigint, > `device_snapshot_time` bigint, > `device_max_snapshot_time` bigint, > `user_id` int, > `user_name` varchar, > `user_partition` int, > `user_event_time` bigint, > `user_write_time` bigint, > `user_snapshot_time` bigint, > `user_max_snapshot_time` bigint > >, > PRIMARY KEY (rowkey) NOT ENFORCED > )with( > 'connector.type'='hbase', > …… > ) > ; > insert into hbase_full_user_click_case1_sink > select > `click_id`, > ROW( > `click_id`, > `click_name`, > `click_partition`, > `click_event_time`, > `click_write_time`, > `click_snapshot_time`, > `click_max_snapshot_time`, > `catalog_id`, > `catalog_name`, > `catalog_partition`, > `catalog_event_time`, > `catalog_write_time`, > `catalog_snapshot_time`, > `catalog_max_snapshot_time`, > `device_id`, > `device_name`, > `device_partition`, > `device_event_time`, > `device_write_time`, > `device_snapshot_time`, > `device_max_snapshot_time`, > `user_id`, > `user_name`, > `user_partition`, > `user_event_time`, > `user_write_time`, > `user_snapshot_time`, > `user_max_snapshot_time` > ) > from (select > click.id as `click_id`, > click.name as `click_name`, > click.kafka_partition as `click_partition`, > click.event_time as `click_event_time`, > click.write_time as `click_write_time`, > click.snapshot_time as `click_snapshot_time`, > click.max_snapshot_time as `click_max_snapshot_time`, > cat.cf.id as `catalog_id`, > cat.cf.name as `catalog_name`, > cat.cf.kafka_partition as `catalog_partition`, > cat.cf.event_time as `catalog_event_time`, > cat.cf.write_time as `catalog_write_time`, > cat.cf.snapshot_time as `catalog_snapshot_time`, > cat.cf.max_snapshot_time as `catalog_max_snapshot_time`, > dev.cf.id as `device_id`, > dev.cf.name as `device_name`, > dev.cf.kafka_partition as `device_partition`, > dev.cf.event_time as `device_event_time`, > dev.cf.write_time as `device_write_time`, > dev.cf.snapshot_time as `device_snapshot_time`, > dev.cf.max_snapshot_time as `device_max_snapshot_time`, > u.cf.id as `user_id`, > u.cf.name as `user_name`, > u.cf.kafka_partition as `user_partition`, > u.cf.event_time as `user_event_time`, > u.cf.write_time as `user_write_time`, > u.cf.snapshot_time as `user_snapshot_time`, > u.cf.max_snapshot_time as `user_max_snapshot_time` > > > from (select > id, > `name`, > `kafka_partition`, > `event_time`, > `write_time`, > `snapshot_time`, > `max_snapshot_time`, > cast(catalog_id as varchar) as catalog_key, > cast(device_id as varchar) as device_key, > cast(user_id as varchar) as user_key, > `catalog_id`, > `device_id`, > `user_id`, > `proc_time`, > `event_time`, > FROM user_click_source > GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND), > `id`, > `name`, > `kafka_partition`, > `event_time`, > `write_time`, > `snapshot_time`, > `max_snapshot_time`, > `catalog_id`, > `device_id`, > `user_id`, > `proc_time`) click > > > left join dim_catalog cat on click.catalog_key = cat.rowkey > left join dim_device dev on click.device_key = dev.rowkey > left join dim_user u on click.user_key = u.rowkey and click.event_time = > u.ts > ) t |
Hi Jark:
感谢回答,我发现是我join的时候,是想将hbase作为维表使用的,但是我遗漏了for system_time as of语句,添加后就不会再报这个错了。 另外有个问题想请教:1.11中新版hbase connector只是指with中指定version为1.4所创建的表吗,我发现使用1.4.3的版本,也是可以正常使用的。是不是说明pk在1.4和1.4.3两个版本上都是生效的? 再次感谢。 Best Xiao Cai 发送自 Windows 10 版邮件应用 发件人: Jark Wu 发送时间: 2020年8月14日 23:23 收件人: user-zh 主题: Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query 推导不出 PK 也不会报错了。 see more: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html Best, Jark On Thu, 13 Aug 2020 at 14:27, xiao cai <[hidden email]> wrote: > Hi All: > 使用flink-sql写入hbase sink时报错: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > > > 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表 > kafka source表与hbase 维表left join后的结果insert到hbase sink表中: > sql如下: > create table user_click_source( > `id` bigint, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint, > `catalog_id` int, > `device_id` int, > `user_id` int, > `proc_time` timestamp(3) > PRIMARY KEY (id) NOT ENFORCED > )with( > 'connector.type' = 'kafka', > …… > ) > ; > create table dim_user( > `rowkey` varchar, > cf ROW< > `id` int, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint > >, > ts bigint > )with( > 'connector.type'='hbase', > …… > ) > ; > > > create table dim_device( > `rowkey` varchar, > cf ROW< > `id` int, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint > > > )with( > 'connector.type'='hbase', > …… > ) > ; > > > create table dim_catalog( > `rowkey` varchar, > cf ROW< > `id` int, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint > > > )with( > 'connector.type'='hbase', > …… > ) > ; > create table hbase_full_user_click_case1_sink( > `rowkey` bigint, > cf ROW< > `click_id` bigint, > `click_name` varchar, > `click_partition` int, > `click_event_time` bigint, > `click_write_time` bigint, > `click_snapshot_time` bigint, > `click_max_snapshot_time` bigint, > `catalog_id` int, > `catalog_name` varchar, > `catalog_partition` int, > `catalog_event_time` bigint, > `catalog_write_time` bigint, > `catalog_snapshot_time` bigint, > `catalog_max_snapshot_time` bigint, > `device_id` int, > `device_name` varchar, > `device_partition` int, > `device_event_time` bigint, > `device_write_time` bigint, > `device_snapshot_time` bigint, > `device_max_snapshot_time` bigint, > `user_id` int, > `user_name` varchar, > `user_partition` int, > `user_event_time` bigint, > `user_write_time` bigint, > `user_snapshot_time` bigint, > `user_max_snapshot_time` bigint > >, > PRIMARY KEY (rowkey) NOT ENFORCED > )with( > 'connector.type'='hbase', > …… > ) > ; > insert into hbase_full_user_click_case1_sink > select > `click_id`, > ROW( > `click_id`, > `click_name`, > `click_partition`, > `click_event_time`, > `click_write_time`, > `click_snapshot_time`, > `click_max_snapshot_time`, > `catalog_id`, > `catalog_name`, > `catalog_partition`, > `catalog_event_time`, > `catalog_write_time`, > `catalog_snapshot_time`, > `catalog_max_snapshot_time`, > `device_id`, > `device_name`, > `device_partition`, > `device_event_time`, > `device_write_time`, > `device_snapshot_time`, > `device_max_snapshot_time`, > `user_id`, > `user_name`, > `user_partition`, > `user_event_time`, > `user_write_time`, > `user_snapshot_time`, > `user_max_snapshot_time` > ) > from (select > click.id as `click_id`, > click.name as `click_name`, > click.kafka_partition as `click_partition`, > click.event_time as `click_event_time`, > click.write_time as `click_write_time`, > click.snapshot_time as `click_snapshot_time`, > click.max_snapshot_time as `click_max_snapshot_time`, > cat.cf.id as `catalog_id`, > cat.cf.name as `catalog_name`, > cat.cf.kafka_partition as `catalog_partition`, > cat.cf.event_time as `catalog_event_time`, > cat.cf.write_time as `catalog_write_time`, > cat.cf.snapshot_time as `catalog_snapshot_time`, > cat.cf.max_snapshot_time as `catalog_max_snapshot_time`, > dev.cf.id as `device_id`, > dev.cf.name as `device_name`, > dev.cf.kafka_partition as `device_partition`, > dev.cf.event_time as `device_event_time`, > dev.cf.write_time as `device_write_time`, > dev.cf.snapshot_time as `device_snapshot_time`, > dev.cf.max_snapshot_time as `device_max_snapshot_time`, > u.cf.id as `user_id`, > u.cf.name as `user_name`, > u.cf.kafka_partition as `user_partition`, > u.cf.event_time as `user_event_time`, > u.cf.write_time as `user_write_time`, > u.cf.snapshot_time as `user_snapshot_time`, > u.cf.max_snapshot_time as `user_max_snapshot_time` > > > from (select > id, > `name`, > `kafka_partition`, > `event_time`, > `write_time`, > `snapshot_time`, > `max_snapshot_time`, > cast(catalog_id as varchar) as catalog_key, > cast(device_id as varchar) as device_key, > cast(user_id as varchar) as user_key, > `catalog_id`, > `device_id`, > `user_id`, > `proc_time`, > `event_time`, > FROM user_click_source > GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND), > `id`, > `name`, > `kafka_partition`, > `event_time`, > `write_time`, > `snapshot_time`, > `max_snapshot_time`, > `catalog_id`, > `device_id`, > `user_id`, > `proc_time`) click > > > left join dim_catalog cat on click.catalog_key = cat.rowkey > left join dim_device dev on click.device_key = dev.rowkey > left join dim_user u on click.user_key = u.rowkey and click.event_time = > u.ts > ) t |
Administrator
|
我上面说的“新版 hbase connector”,指的是 Flink 仓库中实现的新版 sink 连接器,对于 HBase server
1.4和1.4.3都是能用的。 On Sat, 15 Aug 2020 at 00:05, xiao cai <[hidden email]> wrote: > Hi Jark: > 感谢回答,我发现是我join的时候,是想将hbase作为维表使用的,但是我遗漏了for system_time as > of语句,添加后就不会再报这个错了。 > 另外有个问题想请教:1.11中新版hbase > connector只是指with中指定version为1.4所创建的表吗,我发现使用1.4.3的版本,也是可以正常使用的。是不是说明pk在1.4和1.4.3两个版本上都是生效的? > 再次感谢。 > > > Best > Xiao Cai > > 发送自 Windows 10 版邮件应用 > > 发件人: Jark Wu > 发送时间: 2020年8月14日 23:23 > 收件人: user-zh > 主题: Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full > primary keys > > PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query > 推导不出 PK 也不会报错了。 > see more: > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html > > > Best, > Jark > > > On Thu, 13 Aug 2020 at 14:27, xiao cai <[hidden email]> wrote: > > > Hi All: > > 使用flink-sql写入hbase sink时报错: > > UpsertStreamTableSink requires that Table has a full primary keys if it > is > > updated. > > > > > > 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表 > > kafka source表与hbase 维表left join后的结果insert到hbase sink表中: > > sql如下: > > create table user_click_source( > > `id` bigint, > > `name` varchar, > > `kafka_partition` int, > > `event_time` bigint, > > `write_time` bigint, > > `snapshot_time` bigint, > > `max_snapshot_time` bigint, > > `catalog_id` int, > > `device_id` int, > > `user_id` int, > > `proc_time` timestamp(3) > > PRIMARY KEY (id) NOT ENFORCED > > )with( > > 'connector.type' = 'kafka', > > …… > > ) > > ; > > create table dim_user( > > `rowkey` varchar, > > cf ROW< > > `id` int, > > `name` varchar, > > `kafka_partition` int, > > `event_time` bigint, > > `write_time` bigint, > > `snapshot_time` bigint, > > `max_snapshot_time` bigint > > >, > > ts bigint > > )with( > > 'connector.type'='hbase', > > …… > > ) > > ; > > > > > > create table dim_device( > > `rowkey` varchar, > > cf ROW< > > `id` int, > > `name` varchar, > > `kafka_partition` int, > > `event_time` bigint, > > `write_time` bigint, > > `snapshot_time` bigint, > > `max_snapshot_time` bigint > > > > > )with( > > 'connector.type'='hbase', > > …… > > ) > > ; > > > > > > create table dim_catalog( > > `rowkey` varchar, > > cf ROW< > > `id` int, > > `name` varchar, > > `kafka_partition` int, > > `event_time` bigint, > > `write_time` bigint, > > `snapshot_time` bigint, > > `max_snapshot_time` bigint > > > > > )with( > > 'connector.type'='hbase', > > …… > > ) > > ; > > create table hbase_full_user_click_case1_sink( > > `rowkey` bigint, > > cf ROW< > > `click_id` bigint, > > `click_name` varchar, > > `click_partition` int, > > `click_event_time` bigint, > > `click_write_time` bigint, > > `click_snapshot_time` bigint, > > `click_max_snapshot_time` bigint, > > `catalog_id` int, > > `catalog_name` varchar, > > `catalog_partition` int, > > `catalog_event_time` bigint, > > `catalog_write_time` bigint, > > `catalog_snapshot_time` bigint, > > `catalog_max_snapshot_time` bigint, > > `device_id` int, > > `device_name` varchar, > > `device_partition` int, > > `device_event_time` bigint, > > `device_write_time` bigint, > > `device_snapshot_time` bigint, > > `device_max_snapshot_time` bigint, > > `user_id` int, > > `user_name` varchar, > > `user_partition` int, > > `user_event_time` bigint, > > `user_write_time` bigint, > > `user_snapshot_time` bigint, > > `user_max_snapshot_time` bigint > > >, > > PRIMARY KEY (rowkey) NOT ENFORCED > > )with( > > 'connector.type'='hbase', > > …… > > ) > > ; > > insert into hbase_full_user_click_case1_sink > > select > > `click_id`, > > ROW( > > `click_id`, > > `click_name`, > > `click_partition`, > > `click_event_time`, > > `click_write_time`, > > `click_snapshot_time`, > > `click_max_snapshot_time`, > > `catalog_id`, > > `catalog_name`, > > `catalog_partition`, > > `catalog_event_time`, > > `catalog_write_time`, > > `catalog_snapshot_time`, > > `catalog_max_snapshot_time`, > > `device_id`, > > `device_name`, > > `device_partition`, > > `device_event_time`, > > `device_write_time`, > > `device_snapshot_time`, > > `device_max_snapshot_time`, > > `user_id`, > > `user_name`, > > `user_partition`, > > `user_event_time`, > > `user_write_time`, > > `user_snapshot_time`, > > `user_max_snapshot_time` > > ) > > from (select > > click.id as `click_id`, > > click.name as `click_name`, > > click.kafka_partition as `click_partition`, > > click.event_time as `click_event_time`, > > click.write_time as `click_write_time`, > > click.snapshot_time as `click_snapshot_time`, > > click.max_snapshot_time as `click_max_snapshot_time`, > > cat.cf.id as `catalog_id`, > > cat.cf.name as `catalog_name`, > > cat.cf.kafka_partition as `catalog_partition`, > > cat.cf.event_time as `catalog_event_time`, > > cat.cf.write_time as `catalog_write_time`, > > cat.cf.snapshot_time as `catalog_snapshot_time`, > > cat.cf.max_snapshot_time as `catalog_max_snapshot_time`, > > dev.cf.id as `device_id`, > > dev.cf.name as `device_name`, > > dev.cf.kafka_partition as `device_partition`, > > dev.cf.event_time as `device_event_time`, > > dev.cf.write_time as `device_write_time`, > > dev.cf.snapshot_time as `device_snapshot_time`, > > dev.cf.max_snapshot_time as `device_max_snapshot_time`, > > u.cf.id as `user_id`, > > u.cf.name as `user_name`, > > u.cf.kafka_partition as `user_partition`, > > u.cf.event_time as `user_event_time`, > > u.cf.write_time as `user_write_time`, > > u.cf.snapshot_time as `user_snapshot_time`, > > u.cf.max_snapshot_time as `user_max_snapshot_time` > > > > > > from (select > > id, > > `name`, > > `kafka_partition`, > > `event_time`, > > `write_time`, > > `snapshot_time`, > > `max_snapshot_time`, > > cast(catalog_id as varchar) as catalog_key, > > cast(device_id as varchar) as device_key, > > cast(user_id as varchar) as user_key, > > `catalog_id`, > > `device_id`, > > `user_id`, > > `proc_time`, > > `event_time`, > > FROM user_click_source > > GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND), > > `id`, > > `name`, > > `kafka_partition`, > > `event_time`, > > `write_time`, > > `snapshot_time`, > > `max_snapshot_time`, > > `catalog_id`, > > `device_id`, > > `user_id`, > > `proc_time`) click > > > > > > left join dim_catalog cat on click.catalog_key = cat.rowkey > > left join dim_device dev on click.device_key = dev.rowkey > > left join dim_user u on click.user_key = u.rowkey and click.event_time = > > u.ts > > ) t > > > |
Free forum by Nabble | Edit this page |