Hi,请教下大家,关于Temporal Tables,官方文档中的定义方法是
-- 定义一张版本表CREATETABLEproduct_changelog(product_idSTRING,product_nameSTRING,product_priceDECIMAL(10,4),update_timeTIMESTAMP(3)METADATAFROM'value.source.timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCED,-- (1) 定义主键约束WATERMARKFORupdate_timeASupdate_time-- (2) 通过 watermark 定义事件时间 )WITH('connector'='kafka','topic'='products','scan.startup.mode'='earliest-offset','properties.bootstrap.servers'='localhost:9092','value.format'='debezium-json');
这里是debezium-json,是否可以不用debzium做cdc,利用canal呢?
我尝试了替换'value.format'='canal-json’ 会提示value.source.timestamp 这个在metadata中没有,只有timestamp
-- 定义一张版本表CREATETABLEproduct_changelog(product_idSTRING,product_nameSTRING,product_priceDECIMAL(10,4),update_timeTIMESTAMP(3)METADATAFROM'timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCED,-- (1) 定义主键约束WATERMARKFORupdate_timeASupdate_time-- (2) 通过 watermark 定义事件时间 )WITH('connector'='kafka','topic'='products','scan.startup.mode'='earliest-offset','properties.bootstrap.servers'='localhost:9092','value.format'='canal-json');
| |
superainbower
|
|
[hidden email]
|
签名由网易邮箱大师定制