Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

xiao cai
Hi wang:
非常感谢解答,我先顺着你的思路去详细了解下这个过程。
Good luck.
Best,
xiao


 原始邮件
发件人: hailongwang<[hidden email]>
收件人: user-zh<[hidden email]>
发送时间: 2020年11月3日(周二) 21:42
主题: Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题


Hi xiao, 从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。 Best, Hailong Wang 在 2020-11-03 18:27:51,"xiao cai" <[hidden email]> 写道: >Hi : >flink 版本 1.11.2 >问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. > > >代码: >// stream 1 >create table kafkaSource1 ( >id int, >field_1 int, >field_2 varchar, >ts1 timestamp(3), >watermark for `ts1` >) with ( >connector = kafka >) >// stream 2 >create table kafkaSource2 ( >id int, >field_3 >ts2 timestamp(3), >watermark for `ts2` >) with ( >connector = kafka >) > > >//create view >create view kafkaSource1_view as >select >field_1 as field_1, >last_value(field_2) as field_2, >last_value(ts1) as ts1 >from kafkaSouce1 >group by field_1 > > >// query >insert into sinkTable >select >a.field_1, >b.field_3 >from kafkaSource2 a join kafkaSource1_view b >on a.id = b.id >and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY