Flink SQL 嵌套 nested Json 解析

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL 嵌套 nested Json 解析

macia kk
有哪位大佬帮我看下,谢谢


尝试了很久,还是无法解析嵌套结构的Json

Error

Caused by: org.apache.flink.table.api.ValidationException: SQL
validation failed. From line 4, column 9 to line 4, column 31: Column
'data.transaction_type' not found in any table
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
    at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
    at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
    at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)


嵌套Json 定义的 format 和 schema 如下:

      .withFormat(new Json()
        .jsonSchema(
         """{type: 'object',
           |  properties: {
           |      database: {
           |          type: 'string'
           |      },
           |      table: {
           |          type: 'string'
           |      },
           |      maxwell_ts: {
           |          type: 'integer'
           |      },
           |      data: {
           |          type: 'object',
           |          properties :{
           |              reference_id :{
           |                  type: 'string'
           |              },
           |              transaction_type :{
           |                  type: 'integer'
           |              },
           |              merchant_id :{
           |                  type: 'integer'
           |              },
           |              create_time :{
           |                  type: 'integer'
           |              },
           |              status :{
           |                  type: 'integer'
           |              }
           |          }
           |      }
           |   }
           | }
         """.stripMargin.replaceAll("\n", " ")
         )
      )
      .withSchema(new Schema()
        .field("table", STRING())
        .field("database", STRING())
        .field("data", ROW(FIELD("reference_id",STRING()),
FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
FIELD("status",INT())))
        //.field("event_time", BIGINT())
        //  .from("maxwell_ts")
        //.rowtime(new Rowtime()
        //  //.timestampsFromField("ts" * 1000)
        //  .timestampsFromField("ts")
        //  .watermarksPeriodicBounded(60000)
        //)
      )


    bsTableEnv.sqlUpdate("""INSERT INTO yyyyy
                           | SELECT `table`, `database`
                           |        `data.reference_id`,
                           |        `data.transaction_type`,
                           |        `data.merchant_id`,
                           |        `data.create_time`,
                           |        `data.status`
                           | FROM xxxx""".stripMargin)
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 嵌套 nested Json 解析

Leonard Xu
Hi, kk

使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看


祝好,
Leonard Xu


> 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道:
>
> 有哪位大佬帮我看下,谢谢
>
>
> 尝试了很久,还是无法解析嵌套结构的Json
>
> Error
>
> Caused by: org.apache.flink.table.api.ValidationException: SQL
> validation failed. From line 4, column 9 to line 4, column 31: Column
> 'data.transaction_type' not found in any table
>    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
>    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
>    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>    at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
>    at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
>    at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>    at java.lang.reflect.Method.invoke(Method.java:498)
>    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
>
> 嵌套Json 定义的 format 和 schema 如下:
>
>      .withFormat(new Json()
>        .jsonSchema(
>         """{type: 'object',
>           |  properties: {
>           |      database: {
>           |          type: 'string'
>           |      },
>           |      table: {
>           |          type: 'string'
>           |      },
>           |      maxwell_ts: {
>           |          type: 'integer'
>           |      },
>           |      data: {
>           |          type: 'object',
>           |          properties :{
>           |              reference_id :{
>           |                  type: 'string'
>           |              },
>           |              transaction_type :{
>           |                  type: 'integer'
>           |              },
>           |              merchant_id :{
>           |                  type: 'integer'
>           |              },
>           |              create_time :{
>           |                  type: 'integer'
>           |              },
>           |              status :{
>           |                  type: 'integer'
>           |              }
>           |          }
>           |      }
>           |   }
>           | }
>         """.stripMargin.replaceAll("\n", " ")
>         )
>      )
>      .withSchema(new Schema()
>        .field("table", STRING())
>        .field("database", STRING())
>        .field("data", ROW(FIELD("reference_id",STRING()),
> FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
> FIELD("status",INT())))
>        //.field("event_time", BIGINT())
>        //  .from("maxwell_ts")
>        //.rowtime(new Rowtime()
>        //  //.timestampsFromField("ts" * 1000)
>        //  .timestampsFromField("ts")
>        //  .watermarksPeriodicBounded(60000)
>        //)
>      )
>
>
>    bsTableEnv.sqlUpdate("""INSERT INTO yyyyy
>                           | SELECT `table`, `database`
>                           |        `data.reference_id`,
>                           |        `data.transaction_type`,
>                           |        `data.merchant_id`,
>                           |        `data.create_time`,
>                           |        `data.status`
>                           | FROM xxxx""".stripMargin)

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 嵌套 nested Json 解析

macia kk
Flink version: 1.10

Json:
```j
{
    "database":"main_db",
    "maxwell_ts":1590416550358000,
    "table":"transaction_tab",
    "data":{
        "transaction_sn":"8888",
        "parent_id":0,
        "user_id":333,
        "amount":555,
        "reference_id":"666",
        "status":3,
        "transaction_type":3,
        "merchant_id":2,
        "update_time":1590416550,
        "create_time":1590416550
    }
}
```

我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame



Leonard Xu <[hidden email]> 于2020年5月26日周二 上午8:58写道:

> Hi, kk
>
> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
>
>
> 祝好,
> Leonard Xu
>
>
> > 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道:
> >
> > 有哪位大佬帮我看下,谢谢
> >
> >
> > 尝试了很久,还是无法解析嵌套结构的Json
> >
> > Error
> >
> > Caused by: org.apache.flink.table.api.ValidationException: SQL
> > validation failed. From line 4, column 9 to line 4, column 31: Column
> > 'data.transaction_type' not found in any table
> >    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> >    at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> >    at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> >    at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> >    at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> >    at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> >    at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> >    at
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
> >    at
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
> >    at
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
> >    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >    at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >    at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >    at java.lang.reflect.Method.invoke(Method.java:498)
> >    at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> >
> >
> > 嵌套Json 定义的 format 和 schema 如下:
> >
> >      .withFormat(new Json()
> >        .jsonSchema(
> >         """{type: 'object',
> >           |  properties: {
> >           |      database: {
> >           |          type: 'string'
> >           |      },
> >           |      table: {
> >           |          type: 'string'
> >           |      },
> >           |      maxwell_ts: {
> >           |          type: 'integer'
> >           |      },
> >           |      data: {
> >           |          type: 'object',
> >           |          properties :{
> >           |              reference_id :{
> >           |                  type: 'string'
> >           |              },
> >           |              transaction_type :{
> >           |                  type: 'integer'
> >           |              },
> >           |              merchant_id :{
> >           |                  type: 'integer'
> >           |              },
> >           |              create_time :{
> >           |                  type: 'integer'
> >           |              },
> >           |              status :{
> >           |                  type: 'integer'
> >           |              }
> >           |          }
> >           |      }
> >           |   }
> >           | }
> >         """.stripMargin.replaceAll("\n", " ")
> >         )
> >      )
> >      .withSchema(new Schema()
> >        .field("table", STRING())
> >        .field("database", STRING())
> >        .field("data", ROW(FIELD("reference_id",STRING()),
> > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
> > FIELD("status",INT())))
> >        //.field("event_time", BIGINT())
> >        //  .from("maxwell_ts")
> >        //.rowtime(new Rowtime()
> >        //  //.timestampsFromField("ts" * 1000)
> >        //  .timestampsFromField("ts")
> >        //  .watermarksPeriodicBounded(60000)
> >        //)
> >      )
> >
> >
> >    bsTableEnv.sqlUpdate("""INSERT INTO yyyyy
> >                           | SELECT `table`, `database`
> >                           |        `data.reference_id`,
> >                           |        `data.transaction_type`,
> >                           |        `data.merchant_id`,
> >                           |        `data.create_time`,
> >                           |        `data.status`
> >                           | FROM xxxx""".stripMargin)
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 嵌套 nested Json 解析

macia kk
Flink version: 1.10

Json:

{
    "database":"main_db",
    "maxwell_ts":1590416550358000,
    "table":"transaction_tab",
    "data":{
        "transaction_sn":"8888",
        "parent_id":0,
        "user_id":333,
        "amount":555,
        "reference_id":"666",
        "status":3,
        "transaction_type":3,
        "merchant_id":2,
        "update_time":1590416550,
        "create_time":1590416550
    }}


我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame


macia kk <[hidden email]> 于2020年5月26日周二 上午9:34写道:

> Flink version: 1.10
>
> Json:
> ```j
> {
>     "database":"main_db",
>     "maxwell_ts":1590416550358000,
>     "table":"transaction_tab",
>     "data":{
>         "transaction_sn":"8888",
>         "parent_id":0,
>         "user_id":333,
>         "amount":555,
>         "reference_id":"666",
>         "status":3,
>         "transaction_type":3,
>         "merchant_id":2,
>         "update_time":1590416550,
>         "create_time":1590416550
>     }
> }
> ```
>
> 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
>
>
>
> Leonard Xu <[hidden email]> 于2020年5月26日周二 上午8:58写道:
>
>> Hi, kk
>>
>> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
>>
>>
>> 祝好,
>> Leonard Xu
>>
>>
>> > 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道:
>> >
>> > 有哪位大佬帮我看下,谢谢
>> >
>> >
>> > 尝试了很久,还是无法解析嵌套结构的Json
>> >
>> > Error
>> >
>> > Caused by: org.apache.flink.table.api.ValidationException: SQL
>> > validation failed. From line 4, column 9 to line 4, column 31: Column
>> > 'data.transaction_type' not found in any table
>> >    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
>> >    at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>> >    at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
>> >    at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>> >    at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>> >    at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>> >    at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>> >    at
>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
>> >    at
>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
>> >    at
>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
>> >    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >    at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >    at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >    at java.lang.reflect.Method.invoke(Method.java:498)
>> >    at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>> >
>> >
>> > 嵌套Json 定义的 format 和 schema 如下:
>> >
>> >      .withFormat(new Json()
>> >        .jsonSchema(
>> >         """{type: 'object',
>> >           |  properties: {
>> >           |      database: {
>> >           |          type: 'string'
>> >           |      },
>> >           |      table: {
>> >           |          type: 'string'
>> >           |      },
>> >           |      maxwell_ts: {
>> >           |          type: 'integer'
>> >           |      },
>> >           |      data: {
>> >           |          type: 'object',
>> >           |          properties :{
>> >           |              reference_id :{
>> >           |                  type: 'string'
>> >           |              },
>> >           |              transaction_type :{
>> >           |                  type: 'integer'
>> >           |              },
>> >           |              merchant_id :{
>> >           |                  type: 'integer'
>> >           |              },
>> >           |              create_time :{
>> >           |                  type: 'integer'
>> >           |              },
>> >           |              status :{
>> >           |                  type: 'integer'
>> >           |              }
>> >           |          }
>> >           |      }
>> >           |   }
>> >           | }
>> >         """.stripMargin.replaceAll("\n", " ")
>> >         )
>> >      )
>> >      .withSchema(new Schema()
>> >        .field("table", STRING())
>> >        .field("database", STRING())
>> >        .field("data", ROW(FIELD("reference_id",STRING()),
>> > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
>> > FIELD("status",INT())))
>> >        //.field("event_time", BIGINT())
>> >        //  .from("maxwell_ts")
>> >        //.rowtime(new Rowtime()
>> >        //  //.timestampsFromField("ts" * 1000)
>> >        //  .timestampsFromField("ts")
>> >        //  .watermarksPeriodicBounded(60000)
>> >        //)
>> >      )
>> >
>> >
>> >    bsTableEnv.sqlUpdate("""INSERT INTO yyyyy
>> >                           | SELECT `table`, `database`
>> >                           |        `data.reference_id`,
>> >                           |        `data.transaction_type`,
>> >                           |        `data.merchant_id`,
>> >                           |        `data.create_time`,
>> >                           |        `data.status`
>> >                           | FROM xxxx""".stripMargin)
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 嵌套 nested Json 解析

Benchao Li
Hi,

你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子:
create table my_source (
  database varchar,
  maxwell_ts bigint,
  table varchar,
  data row<
    transaction_sn varchar,
    parent_id int,
    user_id int,
    amount int,
    reference_id varchar,
    status int,
    transaction_type int,
    merchant_id int,
    update_time int,
    create_time int
  >
) with (
    ...
)

macia kk <[hidden email]> 于2020年5月26日周二 上午9:36写道:

> Flink version: 1.10
>
> Json:
>
> {
>     "database":"main_db",
>     "maxwell_ts":1590416550358000,
>     "table":"transaction_tab",
>     "data":{
>         "transaction_sn":"8888",
>         "parent_id":0,
>         "user_id":333,
>         "amount":555,
>         "reference_id":"666",
>         "status":3,
>         "transaction_type":3,
>         "merchant_id":2,
>         "update_time":1590416550,
>         "create_time":1590416550
>     }}
>
>
> 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
>
>
> macia kk <[hidden email]> 于2020年5月26日周二 上午9:34写道:
>
> > Flink version: 1.10
> >
> > Json:
> > ```j
> > {
> >     "database":"main_db",
> >     "maxwell_ts":1590416550358000,
> >     "table":"transaction_tab",
> >     "data":{
> >         "transaction_sn":"8888",
> >         "parent_id":0,
> >         "user_id":333,
> >         "amount":555,
> >         "reference_id":"666",
> >         "status":3,
> >         "transaction_type":3,
> >         "merchant_id":2,
> >         "update_time":1590416550,
> >         "create_time":1590416550
> >     }
> > }
> > ```
> >
> > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
> >
> >
> >
> > Leonard Xu <[hidden email]> 于2020年5月26日周二 上午8:58写道:
> >
> >> Hi, kk
> >>
> >> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
> >>
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >>
> >> > 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道:
> >> >
> >> > 有哪位大佬帮我看下,谢谢
> >> >
> >> >
> >> > 尝试了很久,还是无法解析嵌套结构的Json
> >> >
> >> > Error
> >> >
> >> > Caused by: org.apache.flink.table.api.ValidationException: SQL
> >> > validation failed. From line 4, column 9 to line 4, column 31: Column
> >> > 'data.transaction_type' not found in any table
> >> >    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> >> >    at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> >> >    at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> >> >    at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> >> >    at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> >> >    at
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> >> >    at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> >> >    at
> >>
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
> >> >    at
> >>
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
> >> >    at
> >>
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
> >> >    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> >    at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> >    at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >    at java.lang.reflect.Method.invoke(Method.java:498)
> >> >    at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> >> >
> >> >
> >> > 嵌套Json 定义的 format 和 schema 如下:
> >> >
> >> >      .withFormat(new Json()
> >> >        .jsonSchema(
> >> >         """{type: 'object',
> >> >           |  properties: {
> >> >           |      database: {
> >> >           |          type: 'string'
> >> >           |      },
> >> >           |      table: {
> >> >           |          type: 'string'
> >> >           |      },
> >> >           |      maxwell_ts: {
> >> >           |          type: 'integer'
> >> >           |      },
> >> >           |      data: {
> >> >           |          type: 'object',
> >> >           |          properties :{
> >> >           |              reference_id :{
> >> >           |                  type: 'string'
> >> >           |              },
> >> >           |              transaction_type :{
> >> >           |                  type: 'integer'
> >> >           |              },
> >> >           |              merchant_id :{
> >> >           |                  type: 'integer'
> >> >           |              },
> >> >           |              create_time :{
> >> >           |                  type: 'integer'
> >> >           |              },
> >> >           |              status :{
> >> >           |                  type: 'integer'
> >> >           |              }
> >> >           |          }
> >> >           |      }
> >> >           |   }
> >> >           | }
> >> >         """.stripMargin.replaceAll("\n", " ")
> >> >         )
> >> >      )
> >> >      .withSchema(new Schema()
> >> >        .field("table", STRING())
> >> >        .field("database", STRING())
> >> >        .field("data", ROW(FIELD("reference_id",STRING()),
> >> > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
> >> > FIELD("status",INT())))
> >> >        //.field("event_time", BIGINT())
> >> >        //  .from("maxwell_ts")
> >> >        //.rowtime(new Rowtime()
> >> >        //  //.timestampsFromField("ts" * 1000)
> >> >        //  .timestampsFromField("ts")
> >> >        //  .watermarksPeriodicBounded(60000)
> >> >        //)
> >> >      )
> >> >
> >> >
> >> >    bsTableEnv.sqlUpdate("""INSERT INTO yyyyy
> >> >                           | SELECT `table`, `database`
> >> >                           |        `data.reference_id`,
> >> >                           |        `data.transaction_type`,
> >> >                           |        `data.merchant_id`,
> >> >                           |        `data.create_time`,
> >> >                           |        `data.status`
> >> >                           | FROM xxxx""".stripMargin)
> >>
> >>
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复: Flink SQL 嵌套 nested Json 解析

claylin
这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗


create table my_source (
&nbsp; database varchar,
&nbsp; maxwell_ts bigint,
&nbsp; table varchar,
&nbsp; data row<
&nbsp;&nbsp;&nbsp; transaction_sn varchar,
&nbsp;&nbsp;&nbsp; parent_id int,
&nbsp;&nbsp;&nbsp; user_id int,
&nbsp;&nbsp;&nbsp; amount int,
&nbsp;&nbsp;&nbsp; reference_id varchar,
&nbsp;&nbsp;&nbsp; status int,
&nbsp;&nbsp;&nbsp; transaction_type int,
&nbsp;&nbsp;&nbsp; merchant_id int,
&nbsp;&nbsp;&nbsp; update_time int,
&nbsp;&nbsp;&nbsp; create_time int
&nbsp; &nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS TIMESTAMP(3)),&nbsp; // 定义事件时间
&nbsp; &nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE
&nbsp; &gt;
) with (
&nbsp;&nbsp;&nbsp; ...
)


这样可以行吗


------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月26日(星期二) 上午9:55
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Flink SQL 嵌套 nested Json 解析



Hi,

你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子:
create table my_source (
&nbsp; database varchar,
&nbsp; maxwell_ts bigint,
&nbsp; table varchar,
&nbsp; data row<
&nbsp;&nbsp;&nbsp; transaction_sn varchar,
&nbsp;&nbsp;&nbsp; parent_id int,
&nbsp;&nbsp;&nbsp; user_id int,
&nbsp;&nbsp;&nbsp; amount int,
&nbsp;&nbsp;&nbsp; reference_id varchar,
&nbsp;&nbsp;&nbsp; status int,
&nbsp;&nbsp;&nbsp; transaction_type int,
&nbsp;&nbsp;&nbsp; merchant_id int,
&nbsp;&nbsp;&nbsp; update_time int,
&nbsp;&nbsp;&nbsp; create_time int
&nbsp; &gt;
) with (
&nbsp;&nbsp;&nbsp; ...
)

macia kk <[hidden email]&gt; 于2020年5月26日周二 上午9:36写道:

&gt; Flink version: 1.10
&gt;
&gt; Json:
&gt;
&gt; {
&gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db",
&gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab",
&gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_sn":"8888",
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "reference_id":"666",
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_type":3,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "update_time":1590416550,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "create_time":1590416550
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }}
&gt;
&gt;
&gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
&gt;
&gt;
&gt; macia kk <[hidden email]&gt; 于2020年5月26日周二 上午9:34写道:
&gt;
&gt; &gt; Flink version: 1.10
&gt; &gt;
&gt; &gt; Json:
&gt; &gt; ```j
&gt; &gt; {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db",
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab",
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_sn":"8888",
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "reference_id":"666",
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_type":3,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "update_time":1590416550,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "create_time":1590416550
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt; }
&gt; &gt; ```
&gt; &gt;
&gt; &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; Leonard Xu <[hidden email]&gt; 于2020年5月26日周二 上午8:58写道:
&gt; &gt;
&gt; &gt;&gt; Hi, kk
&gt; &gt;&gt;
&gt; &gt;&gt; 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
&gt; &gt;&gt;
&gt; &gt;&gt;
&gt; &gt;&gt; 祝好,
&gt; &gt;&gt; Leonard Xu
&gt; &gt;&gt;
&gt; &gt;&gt;
&gt; &gt;&gt; &gt; 在 2020年5月26日,01:26,macia kk <[hidden email]&gt; 写道:
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; 有哪位大佬帮我看下,谢谢
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; 尝试了很久,还是无法解析嵌套结构的Json
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; Error
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; Caused by: org.apache.flink.table.api.ValidationException: SQL
&gt; &gt;&gt; &gt; validation failed. From line 4, column 9 to line 4, column 31: Column
&gt; &gt;&gt; &gt; 'data.transaction_type' not found in any table
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
&gt; &gt;&gt;
&gt; $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; 嵌套Json 定义的 format 和 schema 如下:
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withFormat(new Json()
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .jsonSchema(
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; """{type: 'object',
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp; properties: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; database: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; table: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; maxwell_ts: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; data: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'object',
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; reference_id :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; transaction_type :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; merchant_id :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; create_time :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; status :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp; }
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; | }
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; """.stripMargin.replaceAll("\n", " ")
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withSchema(new Schema()
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .field("table", STRING())
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .field("database", STRING())
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .field("data", ROW(FIELD("reference_id",STRING()),
&gt; &gt;&gt; &gt; FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
&gt; &gt;&gt; &gt; FIELD("status",INT())))
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //.field("event_time", BIGINT())
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; .from("maxwell_ts")
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //.rowtime(new Rowtime()
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; //.timestampsFromField("ts" * 1000)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; .timestampsFromField("ts")
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; .watermarksPeriodicBounded(60000)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; bsTableEnv.sqlUpdate("""INSERT INTO yyyyy
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; | SELECT `table`, `database`
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.reference_id`,
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.transaction_type`,
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.merchant_id`,
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.create_time`,
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.status`
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; | FROM xxxx""".stripMargin)
&gt; &gt;&gt;
&gt; &gt;&gt;
&gt;


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 嵌套 nested Json 解析

Benchao Li
嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。

claylin <[hidden email]> 于2020年5月26日周二 上午10:07写道:

> 这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗
>
>
> create table my_source (
> &nbsp; database varchar,
> &nbsp; maxwell_ts bigint,
> &nbsp; table varchar,
> &nbsp; data row<
> &nbsp;&nbsp;&nbsp; transaction_sn varchar,
> &nbsp;&nbsp;&nbsp; parent_id int,
> &nbsp;&nbsp;&nbsp; user_id int,
> &nbsp;&nbsp;&nbsp; amount int,
> &nbsp;&nbsp;&nbsp; reference_id varchar,
> &nbsp;&nbsp;&nbsp; status int,
> &nbsp;&nbsp;&nbsp; transaction_type int,
> &nbsp;&nbsp;&nbsp; merchant_id int,
> &nbsp;&nbsp;&nbsp; update_time int,
> &nbsp;&nbsp;&nbsp; create_time int
> &nbsp; &nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS
> TIMESTAMP(3)),&nbsp; // 定义事件时间
> &nbsp; &nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE
> &nbsp; &gt;
> ) with (
> &nbsp;&nbsp;&nbsp; ...
> )
>
>
> 这样可以行吗
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年5月26日(星期二) 上午9:55
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: Flink SQL 嵌套 nested Json 解析
>
>
>
> Hi,
>
> 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子:
> create table my_source (
> &nbsp; database varchar,
> &nbsp; maxwell_ts bigint,
> &nbsp; table varchar,
> &nbsp; data row<
> &nbsp;&nbsp;&nbsp; transaction_sn varchar,
> &nbsp;&nbsp;&nbsp; parent_id int,
> &nbsp;&nbsp;&nbsp; user_id int,
> &nbsp;&nbsp;&nbsp; amount int,
> &nbsp;&nbsp;&nbsp; reference_id varchar,
> &nbsp;&nbsp;&nbsp; status int,
> &nbsp;&nbsp;&nbsp; transaction_type int,
> &nbsp;&nbsp;&nbsp; merchant_id int,
> &nbsp;&nbsp;&nbsp; update_time int,
> &nbsp;&nbsp;&nbsp; create_time int
> &nbsp; &gt;
> ) with (
> &nbsp;&nbsp;&nbsp; ...
> )
>
> macia kk <[hidden email]&gt; 于2020年5月26日周二 上午9:36写道:
>
> &gt; Flink version: 1.10
> &gt;
> &gt; Json:
> &gt;
> &gt; {
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db",
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab",
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "transaction_sn":"8888",
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "reference_id":"666",
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_type":3,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "update_time":1590416550,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "create_time":1590416550
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; }}
> &gt;
> &gt;
> &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
> &gt;
> &gt;
> &gt; macia kk <[hidden email]&gt; 于2020年5月26日周二 上午9:34写道:
> &gt;
> &gt; &gt; Flink version: 1.10
> &gt; &gt;
> &gt; &gt; Json:
> &gt; &gt; ```j
> &gt; &gt; {
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db",
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab",
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "transaction_sn":"8888",
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "reference_id":"666",
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "transaction_type":3,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "update_time":1590416550,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "create_time":1590416550
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt; }
> &gt; &gt; ```
> &gt; &gt;
> &gt; &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Leonard Xu <[hidden email]&gt; 于2020年5月26日周二 上午8:58写道:
> &gt; &gt;
> &gt; &gt;&gt; Hi, kk
> &gt; &gt;&gt;
> &gt; &gt;&gt; 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
> &gt; &gt;&gt;
> &gt; &gt;&gt;
> &gt; &gt;&gt; 祝好,
> &gt; &gt;&gt; Leonard Xu
> &gt; &gt;&gt;
> &gt; &gt;&gt;
> &gt; &gt;&gt; &gt; 在 2020年5月26日,01:26,macia kk <[hidden email]&gt; 写道:
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; 有哪位大佬帮我看下,谢谢
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; 尝试了很久,还是无法解析嵌套结构的Json
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; Error
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; Caused by:
> org.apache.flink.table.api.ValidationException: SQL
> &gt; &gt;&gt; &gt; validation failed. From line 4, column 9 to line 4,
> column 31: Column
> &gt; &gt;&gt; &gt; 'data.transaction_type' not found in any table
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> &gt; &gt;&gt;
> &gt;
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> java.lang.reflect.Method.invoke(Method.java:498)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; 嵌套Json 定义的 format 和 schema 如下:
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withFormat(new Json()
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .jsonSchema(
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> """{type: 'object',
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;
> properties: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; database: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; table: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; maxwell_ts: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; data: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'object',
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> reference_id :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'string'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> transaction_type :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> merchant_id :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> create_time :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> status :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp; }
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; | }
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> """.stripMargin.replaceAll("\n", " ")
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withSchema(new Schema()
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field("table", STRING())
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field("database", STRING())
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field("data", ROW(FIELD("reference_id",STRING()),
> &gt; &gt;&gt; &gt; FIELD("transaction_type",INT()),
> FIELD("merchant_id",INT()),
> &gt; &gt;&gt; &gt; FIELD("status",INT())))
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //.field("event_time", BIGINT())
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;
> .from("maxwell_ts")
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //.rowtime(new Rowtime()
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;
> //.timestampsFromField("ts" * 1000)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;
> .timestampsFromField("ts")
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;
> .watermarksPeriodicBounded(60000)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; bsTableEnv.sqlUpdate("""INSERT INTO
> yyyyy
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> | SELECT `table`, `database`
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.reference_id`,
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.transaction_type`,
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.merchant_id`,
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.create_time`,
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.status`
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> | FROM xxxx""".stripMargin)
> &gt; &gt;&gt;
> &gt; &gt;&gt;
> &gt;
>
>
> --
>
> Best,
> Benchao Li



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复: Flink SQL 嵌套 nested Json 解析

claylin
嗯 谢谢 我试下看下




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月26日(星期二) 上午10:09
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Flink SQL 嵌套 nested Json 解析



嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。

claylin <[hidden email]&gt; 于2020年5月26日周二 上午10:07写道:

&gt; 这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗
&gt;
&gt;
&gt; create table my_source (
&gt; &amp;nbsp; database varchar,
&gt; &amp;nbsp; maxwell_ts bigint,
&gt; &amp;nbsp; table varchar,
&gt; &amp;nbsp; data row<
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_sn varchar,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; parent_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; user_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; amount int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; reference_id varchar,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; status int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_type int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; merchant_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; update_time int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; create_time int
&gt; &amp;nbsp; &amp;nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS
&gt; TIMESTAMP(3)),&amp;nbsp; // 定义事件时间
&gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE
&gt; &amp;nbsp; &amp;gt;
&gt; ) with (
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; ...
&gt; )
&gt;
&gt;
&gt; 这样可以行吗
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年5月26日(星期二) 上午9:55
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: Flink SQL 嵌套 nested Json 解析
&gt;
&gt;
&gt;
&gt; Hi,
&gt;
&gt; 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子:
&gt; create table my_source (
&gt; &amp;nbsp; database varchar,
&gt; &amp;nbsp; maxwell_ts bigint,
&gt; &amp;nbsp; table varchar,
&gt; &amp;nbsp; data row<
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_sn varchar,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; parent_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; user_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; amount int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; reference_id varchar,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; status int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_type int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; merchant_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; update_time int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; create_time int
&gt; &amp;nbsp; &amp;gt;
&gt; ) with (
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; ...
&gt; )
&gt;
&gt; macia kk <[hidden email]&amp;gt; 于2020年5月26日周二 上午9:36写道:
&gt;
&gt; &amp;gt; Flink version: 1.10
&gt; &amp;gt;
&gt; &amp;gt; Json:
&gt; &amp;gt;
&gt; &amp;gt; {
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "database":"main_db",
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "maxwell_ts":1590416550358000,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "table":"transaction_tab",
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "data":{
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "transaction_sn":"8888",
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "parent_id":0,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "user_id":333,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "amount":555,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "reference_id":"666",
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "status":3,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "transaction_type":3,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "merchant_id":2,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "update_time":1590416550,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "create_time":1590416550
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }}
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; macia kk <[hidden email]&amp;gt; 于2020年5月26日周二 上午9:34写道:
&gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Flink version: 1.10
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Json:
&gt; &amp;gt; &amp;gt; ```j
&gt; &amp;gt; &amp;gt; {
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "database":"main_db",
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "maxwell_ts":1590416550358000,
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "table":"transaction_tab",
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "data":{
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "transaction_sn":"8888",
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "parent_id":0,
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "user_id":333,
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "amount":555,
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "reference_id":"666",
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "status":3,
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "transaction_type":3,
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "merchant_id":2,
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "update_time":1590416550,
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "create_time":1590416550
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; &amp;gt; &amp;gt; }
&gt; &amp;gt; &amp;gt; ```
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Leonard Xu <[hidden email]&amp;gt; 于2020年5月26日周二 上午8:58写道:
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; Hi, kk
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; 祝好,
&gt; &amp;gt; &amp;gt;&amp;gt; Leonard Xu
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 在 2020年5月26日,01:26,macia kk <[hidden email]&amp;gt; 写道:
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 有哪位大佬帮我看下,谢谢
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 尝试了很久,还是无法解析嵌套结构的Json
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; Error
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; Caused by:
&gt; org.apache.flink.table.api.ValidationException: SQL
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; validation failed. From line 4, column 9 to line 4,
&gt; column 31: Column
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 'data.transaction_type' not found in any table
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 嵌套Json 定义的 format 和 schema 如下:
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .withFormat(new Json()
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .jsonSchema(
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; """{type: 'object',
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; |&amp;nbsp;
&gt; properties: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; database: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; type: 'string'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; table: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; type: 'string'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; maxwell_ts: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; data: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; type: 'object',
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; properties :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; reference_id :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'string'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; transaction_type :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; merchant_id :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; create_time :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; status :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; }
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp; }
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; | }
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; """.stripMargin.replaceAll("\n", " ")
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; )
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; )
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .withSchema(new Schema()
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field("table", STRING())
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field("database", STRING())
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field("data", ROW(FIELD("reference_id",STRING()),
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; FIELD("transaction_type",INT()),
&gt; FIELD("merchant_id",INT()),
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; FIELD("status",INT())))
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; //.field("event_time", BIGINT())
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //&amp;nbsp;
&gt; .from("maxwell_ts")
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; //.rowtime(new Rowtime()
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //&amp;nbsp;
&gt; //.timestampsFromField("ts" * 1000)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //&amp;nbsp;
&gt; .timestampsFromField("ts")
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //&amp;nbsp;
&gt; .watermarksPeriodicBounded(60000)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; )
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; bsTableEnv.sqlUpdate("""INSERT INTO
&gt; yyyyy
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; | SELECT `table`, `database`
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; `data.reference_id`,
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; `data.transaction_type`,
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; `data.merchant_id`,
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; `data.create_time`,
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; `data.status`
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; | FROM xxxx""".stripMargin)
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 嵌套 nested Json 解析

LakeShen
Hi,

关于 Json 的解析,当你的 Json 里面的一个字段一个镶嵌类型的话,可以将其定义为一个 row,row 里面还可以定义 row 字段。

注意 row 里面的字段名称要和原始json 里面的字段一致。

Best,
LakeShen

claylin <[hidden email]> 于2020年5月26日周二 上午10:17写道:

> 嗯 谢谢 我试下看下
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年5月26日(星期二) 上午10:09
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: Flink SQL 嵌套 nested Json 解析
>
>
>
> 嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。
>
> claylin <[hidden email]&gt; 于2020年5月26日周二 上午10:07写道:
>
> &gt; 这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗
> &gt;
> &gt;
> &gt; create table my_source (
> &gt; &amp;nbsp; database varchar,
> &gt; &amp;nbsp; maxwell_ts bigint,
> &gt; &amp;nbsp; table varchar,
> &gt; &amp;nbsp; data row<
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_sn varchar,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; parent_id int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; user_id int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; amount int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; reference_id varchar,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; status int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_type int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; merchant_id int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; update_time int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; create_time int
> &gt; &amp;nbsp; &amp;nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS
> &gt; TIMESTAMP(3)),&amp;nbsp; // 定义事件时间
> &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE
> &gt; &amp;nbsp; &amp;gt;
> &gt; ) with (
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; ...
> &gt; )
> &gt;
> &gt;
> &gt; 这样可以行吗
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年5月26日(星期二) 上午9:55
> &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: Flink SQL 嵌套 nested Json 解析
> &gt;
> &gt;
> &gt;
> &gt; Hi,
> &gt;
> &gt; 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子:
> &gt; create table my_source (
> &gt; &amp;nbsp; database varchar,
> &gt; &amp;nbsp; maxwell_ts bigint,
> &gt; &amp;nbsp; table varchar,
> &gt; &amp;nbsp; data row<
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_sn varchar,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; parent_id int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; user_id int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; amount int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; reference_id varchar,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; status int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_type int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; merchant_id int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; update_time int,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; create_time int
> &gt; &amp;nbsp; &amp;gt;
> &gt; ) with (
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; ...
> &gt; )
> &gt;
> &gt; macia kk <[hidden email]&amp;gt; 于2020年5月26日周二 上午9:36写道:
> &gt;
> &gt; &amp;gt; Flink version: 1.10
> &gt; &amp;gt;
> &gt; &amp;gt; Json:
> &gt; &amp;gt;
> &gt; &amp;gt; {
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "database":"main_db",
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "maxwell_ts":1590416550358000,
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "table":"transaction_tab",
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "data":{
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; "transaction_sn":"8888",
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "parent_id":0,
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "user_id":333,
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "amount":555,
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "reference_id":"666",
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "status":3,
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "transaction_type":3,
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "merchant_id":2,
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; "update_time":1590416550,
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; "create_time":1590416550
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }}
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; macia kk <[hidden email]&amp;gt; 于2020年5月26日周二 上午9:34写道:
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;gt; Flink version: 1.10
> &gt; &amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt; Json:
> &gt; &amp;gt; &amp;gt; ```j
> &gt; &amp;gt; &amp;gt; {
> &gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "database":"main_db",
> &gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "maxwell_ts":1590416550358000,
> &gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "table":"transaction_tab",
> &gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "data":{
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; "transaction_sn":"8888",
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "parent_id":0,
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "user_id":333,
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "amount":555,
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; "reference_id":"666",
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "status":3,
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; "transaction_type":3,
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> "merchant_id":2,
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; "update_time":1590416550,
> &gt; &amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; "create_time":1590416550
> &gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
> &gt; &amp;gt; &amp;gt; }
> &gt; &amp;gt; &amp;gt; ```
> &gt; &amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
> &gt; &amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt; Leonard Xu <[hidden email]&amp;gt;
> 于2020年5月26日周二 上午8:58写道:
> &gt; &amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; Hi, kk
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt;
> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; 祝好,
> &gt; &amp;gt; &amp;gt;&amp;gt; Leonard Xu
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 在 2020年5月26日,01:26,macia kk <
> [hidden email]&amp;gt; 写道:
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 有哪位大佬帮我看下,谢谢
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 尝试了很久,还是无法解析嵌套结构的Json
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; Error
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; Caused by:
> &gt; org.apache.flink.table.api.ValidationException: SQL
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; validation failed. From line 4,
> column 9 to line 4,
> &gt; column 31: Column
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 'data.transaction_type' not found
> in any table
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; java.lang.reflect.Method.invoke(Method.java:498)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 嵌套Json 定义的 format 和 schema 如下:
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .withFormat(new
> Json()
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> .jsonSchema(
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; """{type: 'object',
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> |&amp;nbsp;
> &gt; properties: {
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; database: {
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> type: 'string'
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; },
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; table: {
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> type: 'string'
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; },
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; maxwell_ts: {
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> type: 'integer'
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; },
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; data: {
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> type: 'object',
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> properties :{
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; reference_id :{
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; type: 'string'
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; },
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; transaction_type :{
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; type: 'integer'
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; },
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; merchant_id :{
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; type: 'integer'
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; },
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; create_time :{
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; type: 'integer'
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; },
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; status :{
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; type: 'integer'
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; }
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> }
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; |&amp;nbsp;&amp;nbsp; }
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> | }
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; """.stripMargin.replaceAll("\n", " ")
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> )
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; )
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .withSchema(new
> Schema()
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; .field("table", STRING())
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; .field("database", STRING())
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; .field("data", ROW(FIELD("reference_id",STRING()),
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; FIELD("transaction_type",INT()),
> &gt; FIELD("merchant_id",INT()),
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; FIELD("status",INT())))
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; //.field("event_time", BIGINT())
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> //&amp;nbsp;
> &gt; .from("maxwell_ts")
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; //.rowtime(new Rowtime()
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> //&amp;nbsp;
> &gt; //.timestampsFromField("ts" * 1000)
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> //&amp;nbsp;
> &gt; .timestampsFromField("ts")
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> //&amp;nbsp;
> &gt; .watermarksPeriodicBounded(60000)
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> //)
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; )
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> bsTableEnv.sqlUpdate("""INSERT INTO
> &gt; yyyyy
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; | SELECT `table`, `database`
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> `data.reference_id`,
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> `data.transaction_type`,
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> `data.merchant_id`,
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> `data.create_time`,
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> `data.status`
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; | FROM xxxx""".stripMargin)
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt; &amp;gt;&amp;gt;
> &gt; &amp;gt;
> &gt;
> &gt;
> &gt; --
> &gt;
> &gt; Best,
> &gt; Benchao Li
>
>
>
> --
>
> Best,
> Benchao Li