FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

xiao cai
Hi :
flink 版本 1.11.2
问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.


代码:
// stream 1
create table kafkaSource1 (
id int,
field_1 int,
field_2 varchar,
ts1 timestamp(3),
watermark for `ts1`
) with (
connector = kafka
)
// stream 2
create table kafkaSource2 (
id int,
field_3
ts2 timestamp(3),
watermark for `ts2`
) with (
connector = kafka
)


//create view
create view kafkaSource1_view as
select
field_1 as field_1,
last_value(field_2) as field_2,
last_value(ts1) as ts1
from kafkaSouce1
group by field_1


// query
insert into sinkTable
select
a.field_1,
b.field_3
from kafkaSource2 a join kafkaSource1_view b
on a.id = b.id
and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY
Reply | Threaded
Open this post in threaded view
|

Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

hailongwang
Hi xiao,


从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 rowtime 属性。
应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。


Best,
Hailong Wang

在 2020-11-03 18:27:51,"xiao cai" <[hidden email]> 写道:

>Hi :
>flink 版本 1.11.2
>问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
>
>
>代码:
>// stream 1
>create table kafkaSource1 (
>id int,
>field_1 int,
>field_2 varchar,
>ts1 timestamp(3),
>watermark for `ts1`
>) with (
>connector = kafka
>)
>// stream 2
>create table kafkaSource2 (
>id int,
>field_3
>ts2 timestamp(3),
>watermark for `ts2`
>) with (
>connector = kafka
>)
>
>
>//create view
>create view kafkaSource1_view as
>select
>field_1 as field_1,
>last_value(field_2) as field_2,
>last_value(ts1) as ts1
>from kafkaSouce1
>group by field_1
>
>
>// query
>insert into sinkTable
>select
>a.field_1,
>b.field_3
>from kafkaSource2 a join kafkaSource1_view b
>on a.id = b.id
>and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY