感谢回答!
我定义了 schema 指定了时间字段,正如下面的代码,唯一的区别是使用
createTemporaryTable 替代原先的 registerTableSource 方法。
Rowtime rowtime = new Rowtime()
.timestampsFromField("searchTime")
.watermarksPeriodicBounded(5 * 1000);
Schema schema = new Schema()
.field("code", DataTypes.INT())
.field("results", DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.STRING()),
DataTypes.FIELD("items",
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD("id",
DataTypes.STRING()),
DataTypes.FIELD("name",
DataTypes.STRING())
)
)
)
))
.field("rowTime", DataTypes.TIMESTAMP()).rowtime(rowtime);
tableEnv.connect(kafka)
.withSchema(schema)
.withFormat(new Json().failOnMissingField(false))
.inAppendMode()
// .registerTableSource("tb_json");
.createTemporaryTable("tb_json");
SQL :SELECT rowTime as searchTime, code, ... FROM tb_json order by
searchTime ASC
Jingsong Li wrote on 2020/5/8 10:02 AM:
> Hi,
>
> 就像异常所说,streaming sql不支持非时间字段的order by。
> 你是怎么来指定时间字段的呢?
>
> Best,
> Jingsong Lee
>
> On Fri, May 8, 2020 at 9:52 AM Hito Zhu<
[hidden email]> wrote:
>
>> hi all,
>> flink 1.10 建议使用 createTemporaryTable 方法代替 registerTableSource
>> 方法,替换后报错,错误信息和SQL如下:
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Sort on a non-time-attribute field is not supported.
>> SQL:select code, ...,searchTime from table order by searchTime asc
>>
>> 不使用 order by 语句没问题。
>>
>>
>>
>>
>>
--
Sent from Postbox <
https://www.postbox-inc.com>