请问如何将已有字段设置为rowtime属性

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

请问如何将已有字段设置为rowtime属性

苏 欣
各位好,我想使用kafka消息中的某个字段作为rowtime属性,遇到了以下问题,使用flink版本为1.9.1。
以下是我尝试的两种用法,都会报错。请问大家有没有遇到过类似的问题,怎么解决的,谢谢!
代码一:

tEnv.connect(
        new Kafka()
                .version("universal")
                .topic("flink-test-dept-1")
                .startFromGroupOffsets()
                .property("bootstrap.servers", "192.168.129.101:9192")
                .property("group.id", "flink-test-consumer-group")
).withFormat(new Json()
        .failOnMissingField(false)
        .deriveSchema()
).withSchema(new Schema()
        .field("dept_id", Types.INT)
        .field("dept_name", Types.STRING)
        .field("crt_time", Types.SQL_TIMESTAMP)
        .rowtime(new Rowtime()
                .timestampsFromField("crt_time")
                .watermarksFromStrategy(new BoundedOutOfOrderTimestamps(1000))
        )
        .field("proc_time", Types.SQL_TIMESTAMP).proctime()
).inAppendMode()
.registerTableSource("dept");

报错:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'crt_time' could not be resolved by the field mapping.
      at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
       at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
       at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
       at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
       at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
       at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
       at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
       at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
       at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
       at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204)
       at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70)
       at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435)
       at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329)
       at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516)
       at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200)
       at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70)
       at com.sean.TimeWindowExample.main(TimeWindowExample.java:47)


代码二:

tEnv.connect(
        new Kafka()
                .version("universal")
                .topic("flink-test-dept-1")
                .startFromGroupOffsets()
                .property("bootstrap.servers", "192.168.129.101:9192")
                .property("group.id", "flink-test-consumer-group")
).withFormat(new Json()
        .failOnMissingField(false)
        .deriveSchema()
).withSchema(new Schema()
        .field("dept_id", Types.INT)
        .field("dept_name", Types.STRING)
        .field("crt_time", Types.SQL_TIMESTAMP)
        .field("row_time", Types.SQL_TIMESTAMP)
        .rowtime(new Rowtime()
                .timestampsFromField("crt_time")
                .watermarksFromStrategy(new BoundedOutOfOrderTimestamps(1000))
        )
        .field("proc_time", Types.SQL_TIMESTAMP).proctime()
).inAppendMode()
.registerTableSource("dept");

报错:
Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
       at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
       at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
       at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
       at com.sean.TimeWindowExample.main(TimeWindowExample.java:48)
Caused by: org.apache.flink.table.api.TableException: Field names must be unique.
List of duplicate fields: [crt_time]
List of all fields: [dept_id, dept_name, crt_time, crt_time]
       at org.apache.flink.table.api.TableSchema.<init>(TableSchema.java:94)
       at org.apache.flink.table.api.TableSchema.<init>(TableSchema.java:49)
       at org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:352)
       at org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema(TableFormatFactoryBase.java:163)
       at org.apache.flink.formats.json.JsonRowFormatFactory.createTypeInformation(JsonRowFormatFactory.java:88)
       at org.apache.flink.formats.json.JsonRowFormatFactory.createDeserializationSchema(JsonRowFormatFactory.java:62)
       at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:263)
       at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
       at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
       at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
       ... 3 more

发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用