Flink SQL文件连接器中proctime()创建表失败的问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL文件连接器中proctime()创建表失败的问题

gaoxing
Flink版本:1.10.0

使用了BlinkPlanner之后,使用java代码创建本地的CSV表时,不支持proctime的配置

创建代码如下:
        tableEnv.connect(new FileSystem()
                .path("file:///Users/test/csv/demo.csv")
        )
                .withFormat(new Csv())
                .withSchema(
                        new Schema()
                                .field("id", DataTypes.STRING())
                                .field("name", DataTypes.STRING())
                                .field("user_action_time", DataTypes.TIMESTAMP(3)).proctime()
                )
                .registerTableSource("csv_table");

异常信息如下:
Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Unsupported property keys:
schema.#.proctime

如果去掉proctime的设置就没有问题。使用DDL创建则没有任何问题:
        tableEnv.sqlUpdate("create table csv_table(" +
                "id string," +
                "name string," +
                "user_action_time as PROCTIME()" +
                ") with (" +
                " 'connector.type' = 'filesystem'," +
                " 'connector.path' = 'file:///Users/test/csv/demo.csv'," +
                " 'format.type' = 'csv'" +
                ")");
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL文件连接器中proctime()创建表失败的问题

Jark
Administrator
这应该是个已知问题,https://issues.apache.org/jira/browse/FLINK-16160
请先使用 DDL 吧。

Best,
Jark

On Fri, 10 Apr 2020 at 16:21, Night_xing <[hidden email]> wrote:

> Flink版本:1.10.0
>
> 使用了BlinkPlanner之后,使用java代码创建本地的CSV表时,不支持proctime的配置
>
> 创建代码如下:
>         tableEnv.connect(new FileSystem()
>                 .path("file:///Users/test/csv/demo.csv")
>         )
>                 .withFormat(new Csv())
>                 .withSchema(
>                         new Schema()
>                                 .field("id", DataTypes.STRING())
>                                 .field("name", DataTypes.STRING())
>                                 .field("user_action_time",
> DataTypes.TIMESTAMP(3)).proctime()
>                 )
>                 .registerTableSource("csv_table");
>
> 异常信息如下:
> Reason: No factory supports all properties.
>
> The matching candidates:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> Unsupported property keys:
> schema.#.proctime
>
> 如果去掉proctime的设置就没有问题。使用DDL创建则没有任何问题:
>         tableEnv.sqlUpdate("create table csv_table(" +
>                 "id string," +
>                 "name string," +
>                 "user_action_time as PROCTIME()" +
>                 ") with (" +
>                 " 'connector.type' = 'filesystem'," +
>                 " 'connector.path' = 'file:///Users/test/csv/demo.csv'," +
>                 " 'format.type' = 'csv'" +
>                 ")");