Flink1.9.1,TableApi如何读取Kafka08Json的数据
Posted by Chennet Steven on Dec 24, 2019; 7:52am
URL: http://apache-flink.370.s1.nabble.com/Window-can-only-be-defined-over-a-time-attribute-column-tp13p1303.html
刚从1.7升级到1.9,感觉kafka的读取方式有了变化,没找到example关于1.9读取kafka的example,谁能给个demo的地址啊?
下面这个代码在1.9下没有跑通过,提示
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'UptTimeMs' could not be resolved by the field mapping.
at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204)
at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70)
at com.teld.demo.Kafka08App.FunA(Kafka08App.java:69)
at com.teld.demo.Kafka08App.main(Kafka08App.java:23)
代码如下
private static void FunA() throws Exception {
Configuration localConfig = new Configuration();
localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment ste = StreamTableEnvironment.create(env);
Kafka kafka08 = new Kafka()
.version("0.8")
.topic("BDP-ChargingMinuteMetric")
.startFromEarliest()
.property("zookeeper.connect", "hdpjntest.chinacloudapp.cn:2182/kafka08")
.property("bootstrap.servers", "telddruidteal.chinacloudapp.cn:9095")
.property("group.id", "abc");
Schema schema = new Schema()
.field("UptTimeMs", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000))
.field("BillId", Types.STRING)
.field("SOC", Types.DOUBLE)
.field("HighestVoltage", Types.DOUBLE);
TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE};
String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"};
TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames);
FormatDescriptor formatDescriptor = new Json().failOnMissingField(false).schema(typeInformation).deriveSchema();
ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable");
Table table = ste.sqlQuery("select * from SourceTable");
DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class);
rowDataStream.print();
ste.execute("ABC");
}
From stevenchen
webchat 38798579