flink1.11 cdc使用

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.11 cdc使用

Dream-底限
hi
我这面想使用flinkcdc做实时etl,我看可以做到维表(时态表)关联,现在想问一下能在cdc功能中用聚合算子嘛,全局groupby或窗口函数
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 cdc使用

china_tao
支持。
insert into mysqlresult select k.vin,k.msgtime,d.brand_name from (SELECT
vin,max(msgtime) as msgtime,max(pts) as pts from kafkaSourceTable  group by
TUMBLE(rowtime, INTERVAL '10' SECOND),vin) AS k left join msyqlDimTable  FOR
SYSTEM_TIME AS OF k.pts AS d ON k.vin = d.vin

类似这样,先开10秒窗口获得kafka数据,然后join msyql维度表,然后插入mysql。
关键就是注意维度表lookup_cache_max-rows,lookup_cache_ttl这两个参数,设置维度表的更新时间。具体项目,具体对待,关键就是看看需要维度表支持多长时间的更新延迟。
另外,join维度表,目前应该只支持pts,不支持rowtime。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 cdc使用

Dream-底限
好的,感谢

china_tao <[hidden email]> 于2020年8月24日周一 下午12:21写道:

> 支持。
> insert into mysqlresult select k.vin,k.msgtime,d.brand_name from (SELECT
> vin,max(msgtime) as msgtime,max(pts) as pts from kafkaSourceTable  group by
> TUMBLE(rowtime, INTERVAL '10' SECOND),vin) AS k left join msyqlDimTable
> FOR
> SYSTEM_TIME AS OF k.pts AS d ON k.vin = d.vin
>
> 类似这样,先开10秒窗口获得kafka数据,然后join msyql维度表,然后插入mysql。
>
> 关键就是注意维度表lookup_cache_max-rows,lookup_cache_ttl这两个参数,设置维度表的更新时间。具体项目,具体对待,关键就是看看需要维度表支持多长时间的更新延迟。
> 另外,join维度表,目前应该只支持pts,不支持rowtime。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>