Login  Register

Flink1.9.1,TableApi如何读取Kafka08Json的数据

Posted by Chennet Steven on Dec 24, 2019; 7:52am
URL: http://apache-flink.370.s1.nabble.com/Window-can-only-be-defined-over-a-time-attribute-column-tp13p1303.html

刚从1.7升级到1.9,感觉kafka的读取方式有了变化,没找到example关于1.9读取kafka的example,谁能给个demo的地址啊?

下面这个代码在1.9下没有跑通过,提示
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'UptTimeMs' could not be resolved by the field mapping.
         at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
         at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
         at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
         at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
         at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
         at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
         at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
         at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
         at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
         at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204)
         at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435)
         at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200)
         at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70)
         at com.teld.demo.Kafka08App.FunA(Kafka08App.java:69)
         at com.teld.demo.Kafka08App.main(Kafka08App.java:23)


代码如下


private static void FunA() throws Exception {
    Configuration localConfig = new Configuration();
    localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment ste = StreamTableEnvironment.create(env);

    Kafka kafka08 = new Kafka()
            .version("0.8")
            .topic("BDP-ChargingMinuteMetric")
            .startFromEarliest()
            .property("zookeeper.connect", "hdpjntest.chinacloudapp.cn:2182/kafka08")
            .property("bootstrap.servers", "telddruidteal.chinacloudapp.cn:9095")
            .property("group.id", "abc");


    Schema schema = new Schema()
            .field("UptTimeMs", Types.SQL_TIMESTAMP)
            .rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000))
            .field("BillId", Types.STRING)
            .field("SOC", Types.DOUBLE)
            .field("HighestVoltage", Types.DOUBLE);


    TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE};
    String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"};
    TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames);
    FormatDescriptor formatDescriptor = new Json().failOnMissingField(false).schema(typeInformation).deriveSchema();

    ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable");
    Table table = ste.sqlQuery("select * from SourceTable");

    DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class);
    rowDataStream.print();

    ste.execute("ABC");
}



From stevenchen
         webchat 38798579