各位好,我想使用kafka消息中的某个字段作为rowtime属性,遇到了以下问题,使用flink版本为1.9.1。
以下是我尝试的两种用法,都会报错。请问大家有没有遇到过类似的问题,怎么解决的,谢谢! 代码一: tEnv.connect( new Kafka() .version("universal") .topic("flink-test-dept-1") .startFromGroupOffsets() .property("bootstrap.servers", "192.168.129.101:9192") .property("group.id", "flink-test-consumer-group") ).withFormat(new Json() .failOnMissingField(false) .deriveSchema() ).withSchema(new Schema() .field("dept_id", Types.INT) .field("dept_name", Types.STRING) .field("crt_time", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("crt_time") .watermarksFromStrategy(new BoundedOutOfOrderTimestamps(1000)) ) .field("proc_time", Types.SQL_TIMESTAMP).proctime() ).inAppendMode() .registerTableSource("dept"); 报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'crt_time' could not be resolved by the field mapping. at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245) at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204) at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70) at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516) at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70) at com.sean.TimeWindowExample.main(TimeWindowExample.java:47) 代码二: tEnv.connect( new Kafka() .version("universal") .topic("flink-test-dept-1") .startFromGroupOffsets() .property("bootstrap.servers", "192.168.129.101:9192") .property("group.id", "flink-test-consumer-group") ).withFormat(new Json() .failOnMissingField(false) .deriveSchema() ).withSchema(new Schema() .field("dept_id", Types.INT) .field("dept_name", Types.STRING) .field("crt_time", Types.SQL_TIMESTAMP) .field("row_time", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("crt_time") .watermarksFromStrategy(new BoundedOutOfOrderTimestamps(1000)) ) .field("proc_time", Types.SQL_TIMESTAMP).proctime() ).inAppendMode() .registerTableSource("dept"); 报错: Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at com.sean.TimeWindowExample.main(TimeWindowExample.java:48) Caused by: org.apache.flink.table.api.TableException: Field names must be unique. List of duplicate fields: [crt_time] List of all fields: [dept_id, dept_name, crt_time, crt_time] at org.apache.flink.table.api.TableSchema.<init>(TableSchema.java:94) at org.apache.flink.table.api.TableSchema.<init>(TableSchema.java:49) at org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:352) at org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema(TableFormatFactoryBase.java:163) at org.apache.flink.formats.json.JsonRowFormatFactory.createTypeInformation(JsonRowFormatFactory.java:88) at org.apache.flink.formats.json.JsonRowFormatFactory.createDeserializationSchema(JsonRowFormatFactory.java:62) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:263) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65) ... 3 more 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 |
Free forum by Nabble | Edit this page |