|
代码如下:
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment);
String[] names = {"timestamp", "user", "message"};
TypeInformation[] types = {Types.SQL_TIMESTAMP(), Types.LONG(), Types.STRING()};
tableEnvironment
.connect(
new Kafka()
.version("universal")
.topic("test-input")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(
new Csv().schema(Types.ROW(names, types))
)
.withSchema(
new Schema()
.field("timestamp", Types.SQL_TIMESTAMP())
.rowtime(new Rowtime()
.timestampsFromField("timestamp")
.watermarksPeriodicBounded(60000)
)
.field("user", Types.LONG())
.field("message", Types.STRING())
)
.inAppendMode()
.registerTableSource("MyUserTable");
假设数据源中有三个字段"timestamp", "user", "message",并通过withFormat定义,如何在定义withSchema时将timestamp字段设为rowtime属性?
以上代码会报错
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'timestamp' could not be resolved by the field mapping.
多次尝试后发现是new Schema().field(“fieldName”)中的字段名称和 .rowtime(new Rowtime().timestampsFromField("fieldName")中的字段名称一样的话就会报错。
所以如何在withSchema中定义rowtime时使用和withFormat中一样的字段名?
Kind regards
He Gongyin
|