支持。
insert into mysqlresult select k.vin,k.msgtime,d.brand_name from (SELECT
vin,max(msgtime) as msgtime,max(pts) as pts from kafkaSourceTable group by
TUMBLE(rowtime, INTERVAL '10' SECOND),vin) AS k left join msyqlDimTable FOR
SYSTEM_TIME AS OF k.pts AS d ON k.vin = d.vin
类似这样,先开10秒窗口获得kafka数据,然后join msyql维度表,然后插入mysql。
关键就是注意维度表lookup_cache_max-rows,lookup_cache_ttl这两个参数,设置维度表的更新时间。具体项目,具体对待,关键就是看看需要维度表支持多长时间的更新延迟。
另外,join维度表,目前应该只支持pts,不支持rowtime。
--
Sent from:
http://apache-flink.147419.n8.nabble.com/