关于Flink SQL中Interval Join使用时watermark的疑惑

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

关于Flink SQL中Interval Join使用时watermark的疑惑

xuty
在Flink SQL定义了两张kafka表(A表和B表),类型为debezium-json,然后要进行区间JOIN,SQL类似这样:

select * from A left join B
on A.id = B.id
and B.dt BETWEEN A.dt and A.dt + INTERVAL '30' SECOND

第一个问题是:想要在A和B表中显示定义watermark(dt字段即event_time)来解决可能出现的乱序问题,但是报错了,不太明白这个报错,是否是flink
sql中目前还不支持Interval Join中定义watermark?

Interval Join doesn't support consuming update and delete changes

第二个问题是:假如没有在A和B表中显示定义watermark,Job可以成功运行,是否会自动根据区间条件生成一个watermark,用于移除过期的state?

延迟问题:比如我的B流一直延迟不来新数据,A流是不是就一直state大于watermark的数据,是否有类似于datastream中定义statettl一样可以配置。

乱序问题:测试B流来了一条比较旧的数据,但是实际也能连接到A流中理论上已经过期的数据,这个不知道是不是和什么配置有关,还是state没有及时根据watermark清理导致,望解答。




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于Flink SQL中Interval Join使用时watermark的疑惑

HunterXHunter
1: Interval Join doesn't support consuming update and delete changes
是因为A或B是一个update stream

2:
 Interval Join 的临时数据是放在buffer中,当wtm超过边界时间就会清理掉 buffer也就join不到了。所以
statettl无法控制A流的缓存数据。
 延迟问题:所以如果wtm不更新,A流的数据不会被清理因为不受statettl控制

 乱序问题:如果B流的旧时间小于 watermark就join不上

以上是个人理解、、



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于Flink SQL中Interval Join使用时watermark的疑惑

HunterXHunter
In reply to this post by xuty
问题一: Interval Join doesn't support consuming update and delete
changes是因为输入A或者B是一个更新流
问题二:interval
join使用buffer来缓存A和B的数据,没有放在state里,只有在watermark超过下边界会触发onEventtime清理 buffer。
延迟问题:没有类似statettl的配置,interval join不受statettl控制清除状态
乱序问题:如果 B的数据时间小于 watermark则匹配不到,一切是跟watermmark相关

以上个人看源码理解的。希望有帮助



--
Sent from: http://apache-flink.147419.n8.nabble.com/