如何定义时态表

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

如何定义时态表

superainbower


Hi,请教下大家,关于Temporal Tables,官方文档中的定义方法是
-- 定义一张版本表CREATETABLEproduct_changelog(product_idSTRING,product_nameSTRING,product_priceDECIMAL(10,4),update_timeTIMESTAMP(3)METADATAFROM'value.source.timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCED,-- (1) 定义主键约束WATERMARKFORupdate_timeASupdate_time-- (2) 通过 watermark 定义事件时间              )WITH('connector'='kafka','topic'='products','scan.startup.mode'='earliest-offset','properties.bootstrap.servers'='localhost:9092','value.format'='debezium-json');
这里是debezium-json,是否可以不用debzium做cdc,利用canal呢?
我尝试了替换'value.format'='canal-json’ 会提示value.source.timestamp 这个在metadata中没有,只有timestamp
-- 定义一张版本表CREATETABLEproduct_changelog(product_idSTRING,product_nameSTRING,product_priceDECIMAL(10,4),update_timeTIMESTAMP(3)METADATAFROM'timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCED,-- (1) 定义主键约束WATERMARKFORupdate_timeASupdate_time-- (2) 通过 watermark 定义事件时间              )WITH('connector'='kafka','topic'='products','scan.startup.mode'='earliest-offset','properties.bootstrap.servers'='localhost:9092','value.format'='canal-json');
| |
superainbower
|
|
[hidden email]
|
签名由网易邮箱大师定制

Reply | Threaded
Open this post in threaded view
|

Re: 如何定义时态表

HunterXHunter
把格式调整下,很乱看不明白



--
Sent from: http://apache-flink.147419.n8.nabble.com/