flink版本1.7.2
flink table api从kafka读取json数据,JsonSchema如下图所示 发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定? tableEnv.connect( new Kafka() .version("0.10") .topic(topic_in) .property("bootstrap.servers", brokers) .property("group.id", "TableApiT2") .startFromLatest() ).withFormat( new Json() .jsonSchema( """ |{ | "type": 'object', | "properties": { | "metric": { | "type": 'object', | "properties": { | "time_stamp": { | "type": 'string', | format: 'date-time' | }, | "event_time": { | "type": 'string' | }, | "cluster": { | "type": 'string' | }, | "host": { | "type": 'string' | }, | "instance": { | "type": 'string' | }, | "index_name": { | "type": 'string' | }, | "index_num": { | "type": 'string' | }, | "value": { | "type": 'number' | } | } | }, | "source": { | "type": 'string' | } | } |} |""".stripMargin ) ).withSchema( new Schema() .field("metric", Types.ROW_NAMED( Array("time_stamp", "event_time", "cluster", "host", "instance", "index_name", "index_num", "value"), Types.SQL_TIMESTAMP, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.BIG_DEC ) ) //***如何指定上面row类型中time_stamp为rowtime ).inAppendMode() .registerTableSource("metricTable") |
Administrator
|
1. 只支持指定顶层字段作为 rowtime,如果要使用 nested field 作为 rowtime,可以先使用计算列(仅在 DDL
上支持)生成顶层列。 2. Descriptor API 有很多问题,且缺失很多功能,不建议使用,建议使用 DDL。 Descriptor API 将在1.12 版本中重构。 Best, Jark On Thu, 17 Sep 2020 at 10:41, kylin <[hidden email]> wrote: > flink版本1.7.2 > > flink table api从kafka读取json数据,JsonSchema如下图所示 > 发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定? > tableEnv.connect( > new Kafka() > .version("0.10") > .topic(topic_in) > .property("bootstrap.servers", brokers) > .property("group.id", "TableApiT2") > .startFromLatest() > ).withFormat( > new Json() > .jsonSchema( > """ > |{ > | "type": 'object', > | "properties": { > | "metric": { > | "type": 'object', > | "properties": { > | "time_stamp": { > | "type": 'string', > | format: 'date-time' > | }, > | "event_time": { > | "type": 'string' > | }, > | "cluster": { > | "type": 'string' > | }, > | "host": { > | "type": 'string' > | }, > | "instance": { > | "type": 'string' > | }, > | "index_name": { > | "type": 'string' > | }, > | "index_num": { > | "type": 'string' > | }, > | "value": { > | "type": 'number' > | } > | } > | }, > | "source": { > | "type": 'string' > | } > | } > |} > |""".stripMargin > ) > ).withSchema( > new Schema() > .field("metric", > Types.ROW_NAMED( > Array("time_stamp", "event_time", "cluster", "host", "instance", > "index_name", "index_num", "value"), > Types.SQL_TIMESTAMP, > Types.STRING, > Types.STRING, > Types.STRING, > Types.STRING, > Types.STRING, > Types.STRING, > Types.BIG_DEC > ) > ) //***如何指定上面row类型中time_stamp为rowtime > ).inAppendMode() > .registerTableSource("metricTable") > > > |
Free forum by Nabble | Edit this page |