changlog-json的去重方法

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

changlog-json的去重方法

DovEゞ
Hi,各位下午好
开发中出现一种场景:
获取某一个人当天最早到店的记录
平常SQL写法是:
select memberid, shop_time, workerid, resource_type, proctime
from(
    select memberid, shop_time, workerid, resource_type, proctime
    from inviteShop
    where shop_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
) t0
inner join
(
    select memberid, min(shop_time) as shop_time
    from inviteShop
    where shop_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
    group by memberid
) t1 on t0.memberid = t1.memberid and t0.shop_time = t1.shop_time 



有查阅到官方文档中提供的去重方法,但是目前inviteShop建的是Kafka表,Changelog-json格式。从报错上看好像是不支持这种格式,想问下有没有其他方法解决?
Best
SELECT [column_list] FROM (    SELECT [column_list],      ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]        ORDER BY time_attr [asc|desc]) AS rownum    FROM table_name) WHERE rownum = 1org.apache.flink.table.api.TableException: Deduplicate doesn't support consuming update and delete changes which is produced by node TableSourceScan