flink 1.10 使用 createTemporaryTable 注册表,SQL 使用 order by 报错

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.10 使用 createTemporaryTable 注册表,SQL 使用 order by 报错

Hito Zhu
hi all,
flink 1.10 建议使用 createTemporaryTable 方法代替 registerTableSource
方法,替换后报错,错误信息和SQL如下:

Exception in thread "main" org.apache.flink.table.api.TableException:
Sort on a non-time-attribute field is not supported.
SQL:select code, ...,searchTime from table order by searchTime asc

不使用 order by 语句没问题。




Reply | Threaded
Open this post in threaded view
|

Re: flink 1.10 使用 createTemporaryTable 注册表,SQL 使用 order by 报错

Jingsong Li
Hi,

就像异常所说,streaming sql不支持非时间字段的order by。
你是怎么来指定时间字段的呢?

Best,
Jingsong Lee

On Fri, May 8, 2020 at 9:52 AM Hito Zhu <[hidden email]> wrote:

> hi all,
> flink 1.10 建议使用 createTemporaryTable 方法代替 registerTableSource
> 方法,替换后报错,错误信息和SQL如下:
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Sort on a non-time-attribute field is not supported.
> SQL:select code, ...,searchTime from table order by searchTime asc
>
> 不使用 order by 语句没问题。
>
>
>
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.10 使用 createTemporaryTable 注册表,SQL 使用 order by 报错

Hito Zhu
感谢回答!
我定义了 schema 指定了时间字段,正如下面的代码,唯一的区别是使用
createTemporaryTable 替代原先的 registerTableSource 方法。
Rowtime rowtime = new Rowtime()
     .timestampsFromField("searchTime")
     .watermarksPeriodicBounded(5 * 1000);

Schema schema = new Schema()
     .field("code", DataTypes.INT())
     .field("results", DataTypes.ROW(
             DataTypes.FIELD("id", DataTypes.STRING()),
             DataTypes.FIELD("items",
                     DataTypes.ARRAY(
                             DataTypes.ROW(
                                     DataTypes.FIELD("id",
DataTypes.STRING()),
                                     DataTypes.FIELD("name",
DataTypes.STRING())
                             )
                     )
             )
     ))
     .field("rowTime", DataTypes.TIMESTAMP()).rowtime(rowtime);

tableEnv.connect(kafka)
     .withSchema(schema)
     .withFormat(new Json().failOnMissingField(false))
     .inAppendMode()
     // .registerTableSource("tb_json");
     .createTemporaryTable("tb_json");

SQL :SELECT rowTime as searchTime,  code, ...  FROM tb_json order by
searchTime ASC

Jingsong Li wrote on 2020/5/8 10:02 AM:

> Hi,
>
> 就像异常所说,streaming sql不支持非时间字段的order by。
> 你是怎么来指定时间字段的呢?
>
> Best,
> Jingsong Lee
>
> On Fri, May 8, 2020 at 9:52 AM Hito Zhu<[hidden email]>  wrote:
>
>> hi all,
>> flink 1.10 建议使用 createTemporaryTable 方法代替 registerTableSource
>> 方法,替换后报错,错误信息和SQL如下:
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Sort on a non-time-attribute field is not supported.
>> SQL:select code, ...,searchTime from table order by searchTime asc
>>
>> 不使用 order by 语句没问题。
>>
>>
>>
>>
>>

--
Sent from Postbox <https://www.postbox-inc.com>