FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

xiao cai
Hi :
flink 版本 1.11.2
问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.


代码:
// stream 1
create table kafkaSource1 (
id int,
field_1 int,
field_2 varchar,
ts1 timestamp(3),
watermark for `ts1`
) with (
connector = kafka
)
// stream 2
create table kafkaSource2 (
id int,
field_3
ts2 timestamp(3),
watermark for `ts2`
) with (
connector = kafka
)


//create view
create view kafkaSource1_view as
select
field_1 as field_1,
last_value(field_2) as field_2,
last_value(ts1) as ts1
from kafkaSouce1
group by field_1


// query
insert into sinkTable
select
a.field_1,
b.field_3
from kafkaSource2 a join kafkaSource1_view b
on a.id = b.id
and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY