Processing time attribute 'pt' is not of type SQL_TIMESTAMP.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Processing time attribute 'pt' is not of type SQL_TIMESTAMP.

marble.zhong@coinflex.com.INVALID
hi,

在用sql api时遇到下面这个exception,
Processing time attribute 'proctime' is not of type SQL_TIMESTAMP.

我是用这个方式map出来的,
        tEnv.connect(getPulsarDescriptor(inTopic))
                .withSchema( new Schema()
                        .field("marketId", DataTypes.BIGINT())
                        .field("midPrice", DataTypes.DECIMAL(38,18))
                        .field("spotPrice", DataTypes.DECIMAL(38,18))
                        .field("lastUpdated", DataTypes.BIGINT())
//                        .field("pt", "SQL_TIMESTAMP").proctime()
                        .field("pt", DataTypes.TIMESTAMP(3)).proctime()
                        )
                .inAppendMode()
                .createTemporaryTable(tableName);

SQL是,
String query = "select marketId, avg(midPrice/spotPrice) as price from " +
tableName
                + " group by hop(pt, INTERVAL '1' second, INTERVAL '10'
minute), marketId";


谢谢




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Processing time attribute 'pt' is not of type SQL_TIMESTAMP.

Jark
Administrator
Hi,

connect API 目前有很多问题。建议使用 DDL 的方式注册表。

Best,
Jark

On Thu, 24 Sep 2020 at 16:39, [hidden email]
<[hidden email]> wrote:

> hi,
>
> 在用sql api时遇到下面这个exception,
> Processing time attribute 'proctime' is not of type SQL_TIMESTAMP.
>
> 我是用这个方式map出来的,
>         tEnv.connect(getPulsarDescriptor(inTopic))
>                 .withSchema( new Schema()
>                         .field("marketId", DataTypes.BIGINT())
>                         .field("midPrice", DataTypes.DECIMAL(38,18))
>                         .field("spotPrice", DataTypes.DECIMAL(38,18))
>                         .field("lastUpdated", DataTypes.BIGINT())
> //                        .field("pt", "SQL_TIMESTAMP").proctime()
>                         .field("pt", DataTypes.TIMESTAMP(3)).proctime()
>                         )
>                 .inAppendMode()
>                 .createTemporaryTable(tableName);
>
> SQL是,
> String query = "select marketId, avg(midPrice/spotPrice) as price from " +
> tableName
>                 + " group by hop(pt, INTERVAL '1' second, INTERVAL '10'
> minute), marketId";
>
>
> 谢谢
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>