regular join 和 interval join 关于水位线设置的问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

regular join 和 interval join 关于水位线设置的问题

yanchenyun
1. source的左表和右表用kafka-json定义:
--左表
create table left_json(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts      timestamp(3)
    ,watermark for op_ts as op_ts - intervals '60' second
) with (
    'connecotr' = 'kafka'
    ,'topic' = 'left-json'
    ,'value.format' = 'json'
    ,'scan.startup.mode' = 'latest-offset'    
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'    
    ,'properties.group.id' = 'left-json-group'
)

--右表
create table right_json(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts      timestamp(3)
    ,watermark for op_ts as op_ts - intervals '60' second
) with (
    'connecotr' = 'kafka'
    ,'topic' = 'right-json'
    ,'value.format' = 'json'
    ,'scan.startup.mode' = 'latest-offset'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'    
    ,'properties.group.id' = 'right-json-group'
)

2. source的左表和右表用upsert-kafka定义:
--左表
create table left_upsert(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
    ,primary key(appl_seq) not enforced
    ,watermark for op_ts as op_ts - intervals '60' second
) with (
    'connecotr' = 'upsert-kafka'
    ,'topic' = 'left-upsert'
    ,'key.format' = 'json'
    ,'value.format' = 'json'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'    
    ,'properties.group.id' = 'left-upsert-group'
)

--右表
create table right_upsert(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
    ,primary key(appl_seq) not enforced
    ,watermark for op_ts as op_ts - intervals '60' second
) with (
    'connecotr' = 'upsert-kafka'
    ,'topic' = 'right-upsert'
    ,'key.format' = 'json'
    ,'value.format' = 'json'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'    
    ,'properties.group.id' = 'right-upsert-group'
)

分别做regular join测试:
1. select * from left_json join right_json on left_json.appl_seq = right_json.appl_seq
2. select * from left_upsert join right_upsert on left_upsert.appl_seq = right_upsert.appl_seq
3. select * from left_upsert join right_json on left_upsert.appl_seq = right_json.appl_seq

报错:
Rowtime attributes must not be in the input rows of a regular join.

再分别做interval join测试:
1. select * from left_json join right_json on left_json.appl_seq = right_json.appl_seq
and right_json.op_ts >= left_json.op_ts
and right_json.op_ts <= left_json.op_ts + interval '1' minute
2. select * from left_upsert join right_upsert on left_upsert.appl_seq = right_upsert.appl_seq
and right_upsert.op_ts >= left_upsert.op_ts
and right_upsert.op_ts <= left_upsert.op_ts + interval '1' minute
3. select * from left_upsert join right_json on left_upsert.appl_seq = right_json.appl_seq
and right_json.op_ts >= left_upsert.op_ts
and right_json.op_ts <= left_upsert.op_ts + interval '1' minute

报错:
Rowtime attributes must not be in the input rows of a regular join.


但是在雪尽的实时数据打宽的讲解中:
interval join是会缓存定义了延迟的watermark的数据的
另外,怎么理解interval join的“输出流保留时间属性”?

怎么理解水位线在 regular & interval join 中的水位线的作用,我试验后发现这两种join方式都只是在state中维护了相应pk下的最新来的那一份row数据的呀?