请教 如何使用TableAPI connector将一个字段定义为rowtime属性

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

请教 如何使用TableAPI connector将一个字段定义为rowtime属性

hegongyin
代码如下:
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment);

String[] names = {"timestamp", "user", "message"};
TypeInformation[] types = {Types.SQL_TIMESTAMP(), Types.LONG(), Types.STRING()};
tableEnvironment
        .connect(
                new Kafka()
                        .version("universal")
                        .topic("test-input")
                        .startFromEarliest()
                        .property("zookeeper.connect", "localhost:2181")
                        .property("bootstrap.servers", "localhost:9092")
        )
        .withFormat(
                new Csv().schema(Types.ROW(names, types))
        )
        .withSchema(
                new Schema()
                        .field("timestamp", Types.SQL_TIMESTAMP())
                        .rowtime(new Rowtime()
                                .timestampsFromField("timestamp")
                                .watermarksPeriodicBounded(60000)
                        )
                        .field("user", Types.LONG())
                        .field("message", Types.STRING())
        )
        .inAppendMode()
        .registerTableSource("MyUserTable");

假设数据源中有三个字段"timestamp", "user", "message",并通过withFormat定义,如何在定义withSchema时将timestamp字段设为rowtime属性?
以上代码会报错
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'timestamp' could not be resolved by the field mapping.

多次尝试后发现是new Schema().field(“fieldName”)中的字段名称和 .rowtime(new Rowtime().timestampsFromField("fieldName")中的字段名称一样的话就会报错。

所以如何在withSchema中定义rowtime时使用和withFormat中一样的字段名?


Kind regards
He Gongyin