flink table api中无法设置子json中的列为rowtime

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink table api中无法设置子json中的列为rowtime

kylin
flink版本1.7.2

flink table api从kafka读取json数据,JsonSchema如下图所示
发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定?
tableEnv.connect(
  new Kafka()
    .version("0.10")
    .topic(topic_in)
    .property("bootstrap.servers", brokers)
    .property("group.id", "TableApiT2")
    .startFromLatest()
).withFormat(
  new Json()
    .jsonSchema(
      """
        |{
        |  "type": 'object',
        |  "properties": {
        |   "metric": {
        |    "type": 'object',
        |         "properties": {
        |           "time_stamp": {
        |       "type": 'string',
        |       format: 'date-time'
        |      },
        |      "event_time": {
        |       "type": 'string'
        |      },
        |      "cluster": {
        |       "type": 'string'
        |      },
        |      "host": {
        |       "type": 'string'
        |      },
        |      "instance": {
        |       "type": 'string'
        |      },
        |      "index_name": {
        |       "type": 'string'
        |      },
        |      "index_num": {
        |       "type": 'string'
        |      },
        |      "value": {
        |       "type": 'number'
        |      }
        |    }
        |   },
        |   "source": {
        |    "type": 'string'
        |   }
        |  }
        |}
        |""".stripMargin
    )
).withSchema(
  new Schema()
    .field("metric",
      Types.ROW_NAMED(
        Array("time_stamp", "event_time", "cluster", "host", "instance", "index_name", "index_num", "value"),
        Types.SQL_TIMESTAMP,
        Types.STRING,
        Types.STRING,
        Types.STRING,
        Types.STRING,
        Types.STRING,
        Types.STRING,
        Types.BIG_DEC
      )
    ) //***如何指定上面row类型中time_stamp为rowtime
).inAppendMode()
  .registerTableSource("metricTable")


Reply | Threaded
Open this post in threaded view
|

Re: flink table api中无法设置子json中的列为rowtime

Jark
Administrator
1. 只支持指定顶层字段作为 rowtime,如果要使用 nested field 作为 rowtime,可以先使用计算列(仅在 DDL
上支持)生成顶层列。
2. Descriptor API 有很多问题,且缺失很多功能,不建议使用,建议使用 DDL。 Descriptor API 将在1.12 版本中重构。


Best,
Jark


On Thu, 17 Sep 2020 at 10:41, kylin <[hidden email]> wrote:

> flink版本1.7.2
>
> flink table api从kafka读取json数据,JsonSchema如下图所示
> 发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定?
> tableEnv.connect(
>   new Kafka()
>     .version("0.10")
>     .topic(topic_in)
>     .property("bootstrap.servers", brokers)
>     .property("group.id", "TableApiT2")
>     .startFromLatest()
> ).withFormat(
>   new Json()
>     .jsonSchema(
>       """
>         |{
>         |  "type": 'object',
>         |  "properties": {
>         |   "metric": {
>         |    "type": 'object',
>         |         "properties": {
>         |           "time_stamp": {
>         |       "type": 'string',
>         |       format: 'date-time'
>         |      },
>         |      "event_time": {
>         |       "type": 'string'
>         |      },
>         |      "cluster": {
>         |       "type": 'string'
>         |      },
>         |      "host": {
>         |       "type": 'string'
>         |      },
>         |      "instance": {
>         |       "type": 'string'
>         |      },
>         |      "index_name": {
>         |       "type": 'string'
>         |      },
>         |      "index_num": {
>         |       "type": 'string'
>         |      },
>         |      "value": {
>         |       "type": 'number'
>         |      }
>         |    }
>         |   },
>         |   "source": {
>         |    "type": 'string'
>         |   }
>         |  }
>         |}
>         |""".stripMargin
>     )
> ).withSchema(
>   new Schema()
>     .field("metric",
>       Types.ROW_NAMED(
>         Array("time_stamp", "event_time", "cluster", "host", "instance",
> "index_name", "index_num", "value"),
>         Types.SQL_TIMESTAMP,
>         Types.STRING,
>         Types.STRING,
>         Types.STRING,
>         Types.STRING,
>         Types.STRING,
>         Types.STRING,
>         Types.BIG_DEC
>       )
>     ) //***如何指定上面row类型中time_stamp为rowtime
> ).inAppendMode()
>   .registerTableSource("metricTable")
>
>
>