1. source的左表和右表用kafka-json定义:
--左表 create table left_json( appl_seq string ,amount decimal(16,2) ,op_ts timestamp(3) ,watermark for op_ts as op_ts - intervals '60' second ) with ( 'connecotr' = 'kafka' ,'topic' = 'left-json' ,'value.format' = 'json' ,'scan.startup.mode' = 'latest-offset' ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092' ,'properties.group.id' = 'left-json-group' ) --右表 create table right_json( appl_seq string ,amount decimal(16,2) ,op_ts timestamp(3) ,watermark for op_ts as op_ts - intervals '60' second ) with ( 'connecotr' = 'kafka' ,'topic' = 'right-json' ,'value.format' = 'json' ,'scan.startup.mode' = 'latest-offset' ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092' ,'properties.group.id' = 'right-json-group' ) 2. source的左表和右表用upsert-kafka定义: --左表 create table left_upsert( appl_seq string ,amount decimal(16,2) ,op_ts timestamp(3) ,primary key(appl_seq) not enforced ,watermark for op_ts as op_ts - intervals '60' second ) with ( 'connecotr' = 'upsert-kafka' ,'topic' = 'left-upsert' ,'key.format' = 'json' ,'value.format' = 'json' ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092' ,'properties.group.id' = 'left-upsert-group' ) --右表 create table right_upsert( appl_seq string ,amount decimal(16,2) ,op_ts timestamp(3) ,primary key(appl_seq) not enforced ,watermark for op_ts as op_ts - intervals '60' second ) with ( 'connecotr' = 'upsert-kafka' ,'topic' = 'right-upsert' ,'key.format' = 'json' ,'value.format' = 'json' ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092' ,'properties.group.id' = 'right-upsert-group' ) 分别做regular join测试: 1. select * from left_json join right_json on left_json.appl_seq = right_json.appl_seq 2. select * from left_upsert join right_upsert on left_upsert.appl_seq = right_upsert.appl_seq 3. select * from left_upsert join right_json on left_upsert.appl_seq = right_json.appl_seq 报错: Rowtime attributes must not be in the input rows of a regular join. 再分别做interval join测试: 1. select * from left_json join right_json on left_json.appl_seq = right_json.appl_seq and right_json.op_ts >= left_json.op_ts and right_json.op_ts <= left_json.op_ts + interval '1' minute 2. select * from left_upsert join right_upsert on left_upsert.appl_seq = right_upsert.appl_seq and right_upsert.op_ts >= left_upsert.op_ts and right_upsert.op_ts <= left_upsert.op_ts + interval '1' minute 3. select * from left_upsert join right_json on left_upsert.appl_seq = right_json.appl_seq and right_json.op_ts >= left_upsert.op_ts and right_json.op_ts <= left_upsert.op_ts + interval '1' minute 报错: Rowtime attributes must not be in the input rows of a regular join. 但是在雪尽的实时数据打宽的讲解中: interval join是会缓存定义了延迟的watermark的数据的 另外,怎么理解interval join的“输出流保留时间属性”? 怎么理解水位线在 regular & interval join 中的水位线的作用,我试验后发现这两种join方式都只是在state中维护了相应pk下的最新来的那一份row数据的呀? |
Free forum by Nabble | Edit this page |