Hi xiao,
从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 rowtime 属性。
应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。
Best,
Hailong Wang
在 2020-11-03 18:27:51,"xiao cai" <
[hidden email]> 写道:
>Hi :
>flink 版本 1.11.2
>问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
>
>
>代码:
>// stream 1
>create table kafkaSource1 (
>id int,
>field_1 int,
>field_2 varchar,
>ts1 timestamp(3),
>watermark for `ts1`
>) with (
>connector = kafka
>)
>// stream 2
>create table kafkaSource2 (
>id int,
>field_3
>ts2 timestamp(3),
>watermark for `ts2`
>) with (
>connector = kafka
>)
>
>
>//create view
>create view kafkaSource1_view as
>select
>field_1 as field_1,
>last_value(field_2) as field_2,
>last_value(ts1) as ts1
>from kafkaSouce1
>group by field_1
>
>
>// query
>insert into sinkTable
>select
>a.field_1,
>b.field_3
>from kafkaSource2 a join kafkaSource1_view b
>on a.id = b.id
>and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY