有哪位大佬帮我看下,谢谢
尝试了很久,还是无法解析嵌套结构的Json Error Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 9 to line 4, column 31: Column 'data.transaction_type' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) 嵌套Json 定义的 format 和 schema 如下: .withFormat(new Json() .jsonSchema( """{type: 'object', | properties: { | database: { | type: 'string' | }, | table: { | type: 'string' | }, | maxwell_ts: { | type: 'integer' | }, | data: { | type: 'object', | properties :{ | reference_id :{ | type: 'string' | }, | transaction_type :{ | type: 'integer' | }, | merchant_id :{ | type: 'integer' | }, | create_time :{ | type: 'integer' | }, | status :{ | type: 'integer' | } | } | } | } | } """.stripMargin.replaceAll("\n", " ") ) ) .withSchema(new Schema() .field("table", STRING()) .field("database", STRING()) .field("data", ROW(FIELD("reference_id",STRING()), FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), FIELD("status",INT()))) //.field("event_time", BIGINT()) // .from("maxwell_ts") //.rowtime(new Rowtime() // //.timestampsFromField("ts" * 1000) // .timestampsFromField("ts") // .watermarksPeriodicBounded(60000) //) ) bsTableEnv.sqlUpdate("""INSERT INTO yyyyy | SELECT `table`, `database` | `data.reference_id`, | `data.transaction_type`, | `data.merchant_id`, | `data.create_time`, | `data.status` | FROM xxxx""".stripMargin) |
Hi, kk
使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 祝好, Leonard Xu > 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道: > > 有哪位大佬帮我看下,谢谢 > > > 尝试了很久,还是无法解析嵌套结构的Json > > Error > > Caused by: org.apache.flink.table.api.ValidationException: SQL > validation failed. From line 4, column 9 to line 4, column 31: Column > 'data.transaction_type' not found in any table > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > > > 嵌套Json 定义的 format 和 schema 如下: > > .withFormat(new Json() > .jsonSchema( > """{type: 'object', > | properties: { > | database: { > | type: 'string' > | }, > | table: { > | type: 'string' > | }, > | maxwell_ts: { > | type: 'integer' > | }, > | data: { > | type: 'object', > | properties :{ > | reference_id :{ > | type: 'string' > | }, > | transaction_type :{ > | type: 'integer' > | }, > | merchant_id :{ > | type: 'integer' > | }, > | create_time :{ > | type: 'integer' > | }, > | status :{ > | type: 'integer' > | } > | } > | } > | } > | } > """.stripMargin.replaceAll("\n", " ") > ) > ) > .withSchema(new Schema() > .field("table", STRING()) > .field("database", STRING()) > .field("data", ROW(FIELD("reference_id",STRING()), > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), > FIELD("status",INT()))) > //.field("event_time", BIGINT()) > // .from("maxwell_ts") > //.rowtime(new Rowtime() > // //.timestampsFromField("ts" * 1000) > // .timestampsFromField("ts") > // .watermarksPeriodicBounded(60000) > //) > ) > > > bsTableEnv.sqlUpdate("""INSERT INTO yyyyy > | SELECT `table`, `database` > | `data.reference_id`, > | `data.transaction_type`, > | `data.merchant_id`, > | `data.create_time`, > | `data.status` > | FROM xxxx""".stripMargin) |
Flink version: 1.10
Json: ```j { "database":"main_db", "maxwell_ts":1590416550358000, "table":"transaction_tab", "data":{ "transaction_sn":"8888", "parent_id":0, "user_id":333, "amount":555, "reference_id":"666", "status":3, "transaction_type":3, "merchant_id":2, "update_time":1590416550, "create_time":1590416550 } } ``` 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame Leonard Xu <[hidden email]> 于2020年5月26日周二 上午8:58写道: > Hi, kk > > 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 > > > 祝好, > Leonard Xu > > > > 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道: > > > > 有哪位大佬帮我看下,谢谢 > > > > > > 尝试了很久,还是无法解析嵌套结构的Json > > > > Error > > > > Caused by: org.apache.flink.table.api.ValidationException: SQL > > validation failed. From line 4, column 9 to line 4, column 31: Column > > 'data.transaction_type' not found in any table > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > at > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > > at > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > > at > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > > > > > > 嵌套Json 定义的 format 和 schema 如下: > > > > .withFormat(new Json() > > .jsonSchema( > > """{type: 'object', > > | properties: { > > | database: { > > | type: 'string' > > | }, > > | table: { > > | type: 'string' > > | }, > > | maxwell_ts: { > > | type: 'integer' > > | }, > > | data: { > > | type: 'object', > > | properties :{ > > | reference_id :{ > > | type: 'string' > > | }, > > | transaction_type :{ > > | type: 'integer' > > | }, > > | merchant_id :{ > > | type: 'integer' > > | }, > > | create_time :{ > > | type: 'integer' > > | }, > > | status :{ > > | type: 'integer' > > | } > > | } > > | } > > | } > > | } > > """.stripMargin.replaceAll("\n", " ") > > ) > > ) > > .withSchema(new Schema() > > .field("table", STRING()) > > .field("database", STRING()) > > .field("data", ROW(FIELD("reference_id",STRING()), > > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), > > FIELD("status",INT()))) > > //.field("event_time", BIGINT()) > > // .from("maxwell_ts") > > //.rowtime(new Rowtime() > > // //.timestampsFromField("ts" * 1000) > > // .timestampsFromField("ts") > > // .watermarksPeriodicBounded(60000) > > //) > > ) > > > > > > bsTableEnv.sqlUpdate("""INSERT INTO yyyyy > > | SELECT `table`, `database` > > | `data.reference_id`, > > | `data.transaction_type`, > > | `data.merchant_id`, > > | `data.create_time`, > > | `data.status` > > | FROM xxxx""".stripMargin) > > |
Flink version: 1.10
Json: { "database":"main_db", "maxwell_ts":1590416550358000, "table":"transaction_tab", "data":{ "transaction_sn":"8888", "parent_id":0, "user_id":333, "amount":555, "reference_id":"666", "status":3, "transaction_type":3, "merchant_id":2, "update_time":1590416550, "create_time":1590416550 }} 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame macia kk <[hidden email]> 于2020年5月26日周二 上午9:34写道: > Flink version: 1.10 > > Json: > ```j > { > "database":"main_db", > "maxwell_ts":1590416550358000, > "table":"transaction_tab", > "data":{ > "transaction_sn":"8888", > "parent_id":0, > "user_id":333, > "amount":555, > "reference_id":"666", > "status":3, > "transaction_type":3, > "merchant_id":2, > "update_time":1590416550, > "create_time":1590416550 > } > } > ``` > > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > > > Leonard Xu <[hidden email]> 于2020年5月26日周二 上午8:58写道: > >> Hi, kk >> >> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 >> >> >> 祝好, >> Leonard Xu >> >> >> > 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道: >> > >> > 有哪位大佬帮我看下,谢谢 >> > >> > >> > 尝试了很久,还是无法解析嵌套结构的Json >> > >> > Error >> > >> > Caused by: org.apache.flink.table.api.ValidationException: SQL >> > validation failed. From line 4, column 9 to line 4, column 31: Column >> > 'data.transaction_type' not found in any table >> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) >> > at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) >> > at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) >> > at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) >> > at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) >> > at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >> > at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >> > at >> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) >> > at >> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) >> > at >> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > at java.lang.reflect.Method.invoke(Method.java:498) >> > at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >> > >> > >> > 嵌套Json 定义的 format 和 schema 如下: >> > >> > .withFormat(new Json() >> > .jsonSchema( >> > """{type: 'object', >> > | properties: { >> > | database: { >> > | type: 'string' >> > | }, >> > | table: { >> > | type: 'string' >> > | }, >> > | maxwell_ts: { >> > | type: 'integer' >> > | }, >> > | data: { >> > | type: 'object', >> > | properties :{ >> > | reference_id :{ >> > | type: 'string' >> > | }, >> > | transaction_type :{ >> > | type: 'integer' >> > | }, >> > | merchant_id :{ >> > | type: 'integer' >> > | }, >> > | create_time :{ >> > | type: 'integer' >> > | }, >> > | status :{ >> > | type: 'integer' >> > | } >> > | } >> > | } >> > | } >> > | } >> > """.stripMargin.replaceAll("\n", " ") >> > ) >> > ) >> > .withSchema(new Schema() >> > .field("table", STRING()) >> > .field("database", STRING()) >> > .field("data", ROW(FIELD("reference_id",STRING()), >> > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), >> > FIELD("status",INT()))) >> > //.field("event_time", BIGINT()) >> > // .from("maxwell_ts") >> > //.rowtime(new Rowtime() >> > // //.timestampsFromField("ts" * 1000) >> > // .timestampsFromField("ts") >> > // .watermarksPeriodicBounded(60000) >> > //) >> > ) >> > >> > >> > bsTableEnv.sqlUpdate("""INSERT INTO yyyyy >> > | SELECT `table`, `database` >> > | `data.reference_id`, >> > | `data.transaction_type`, >> > | `data.merchant_id`, >> > | `data.create_time`, >> > | `data.status` >> > | FROM xxxx""".stripMargin) >> >> |
Hi,
你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子: create table my_source ( database varchar, maxwell_ts bigint, table varchar, data row< transaction_sn varchar, parent_id int, user_id int, amount int, reference_id varchar, status int, transaction_type int, merchant_id int, update_time int, create_time int > ) with ( ... ) macia kk <[hidden email]> 于2020年5月26日周二 上午9:36写道: > Flink version: 1.10 > > Json: > > { > "database":"main_db", > "maxwell_ts":1590416550358000, > "table":"transaction_tab", > "data":{ > "transaction_sn":"8888", > "parent_id":0, > "user_id":333, > "amount":555, > "reference_id":"666", > "status":3, > "transaction_type":3, > "merchant_id":2, > "update_time":1590416550, > "create_time":1590416550 > }} > > > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > > macia kk <[hidden email]> 于2020年5月26日周二 上午9:34写道: > > > Flink version: 1.10 > > > > Json: > > ```j > > { > > "database":"main_db", > > "maxwell_ts":1590416550358000, > > "table":"transaction_tab", > > "data":{ > > "transaction_sn":"8888", > > "parent_id":0, > > "user_id":333, > > "amount":555, > > "reference_id":"666", > > "status":3, > > "transaction_type":3, > > "merchant_id":2, > > "update_time":1590416550, > > "create_time":1590416550 > > } > > } > > ``` > > > > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > > > > > > > Leonard Xu <[hidden email]> 于2020年5月26日周二 上午8:58写道: > > > >> Hi, kk > >> > >> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 > >> > >> > >> 祝好, > >> Leonard Xu > >> > >> > >> > 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道: > >> > > >> > 有哪位大佬帮我看下,谢谢 > >> > > >> > > >> > 尝试了很久,还是无法解析嵌套结构的Json > >> > > >> > Error > >> > > >> > Caused by: org.apache.flink.table.api.ValidationException: SQL > >> > validation failed. From line 4, column 9 to line 4, column 31: Column > >> > 'data.transaction_type' not found in any table > >> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > >> > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > >> > at > >> > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > >> > at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > >> > at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > >> > at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > >> > at > >> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > >> > at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > >> > at > >> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > >> > at > >> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > >> > at > >> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> > at > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> > at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> > at java.lang.reflect.Method.invoke(Method.java:498) > >> > at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > >> > > >> > > >> > 嵌套Json 定义的 format 和 schema 如下: > >> > > >> > .withFormat(new Json() > >> > .jsonSchema( > >> > """{type: 'object', > >> > | properties: { > >> > | database: { > >> > | type: 'string' > >> > | }, > >> > | table: { > >> > | type: 'string' > >> > | }, > >> > | maxwell_ts: { > >> > | type: 'integer' > >> > | }, > >> > | data: { > >> > | type: 'object', > >> > | properties :{ > >> > | reference_id :{ > >> > | type: 'string' > >> > | }, > >> > | transaction_type :{ > >> > | type: 'integer' > >> > | }, > >> > | merchant_id :{ > >> > | type: 'integer' > >> > | }, > >> > | create_time :{ > >> > | type: 'integer' > >> > | }, > >> > | status :{ > >> > | type: 'integer' > >> > | } > >> > | } > >> > | } > >> > | } > >> > | } > >> > """.stripMargin.replaceAll("\n", " ") > >> > ) > >> > ) > >> > .withSchema(new Schema() > >> > .field("table", STRING()) > >> > .field("database", STRING()) > >> > .field("data", ROW(FIELD("reference_id",STRING()), > >> > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), > >> > FIELD("status",INT()))) > >> > //.field("event_time", BIGINT()) > >> > // .from("maxwell_ts") > >> > //.rowtime(new Rowtime() > >> > // //.timestampsFromField("ts" * 1000) > >> > // .timestampsFromField("ts") > >> > // .watermarksPeriodicBounded(60000) > >> > //) > >> > ) > >> > > >> > > >> > bsTableEnv.sqlUpdate("""INSERT INTO yyyyy > >> > | SELECT `table`, `database` > >> > | `data.reference_id`, > >> > | `data.transaction_type`, > >> > | `data.merchant_id`, > >> > | `data.create_time`, > >> > | `data.status` > >> > | FROM xxxx""".stripMargin) > >> > >> > -- Best, Benchao Li |
这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗
create table my_source ( database varchar, maxwell_ts bigint, table varchar, data row< transaction_sn varchar, parent_id int, user_id int, amount int, reference_id varchar, status int, transaction_type int, merchant_id int, update_time int, create_time int ts AS CAST(FROM_UNIXTIME(create_time) AS TIMESTAMP(3)), // 定义事件时间 WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE > ) with ( ... ) 这样可以行吗 ------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年5月26日(星期二) 上午9:55 收件人: "user-zh"<[hidden email]>; 主题: Re: Flink SQL 嵌套 nested Json 解析 Hi, 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子: create table my_source ( database varchar, maxwell_ts bigint, table varchar, data row< transaction_sn varchar, parent_id int, user_id int, amount int, reference_id varchar, status int, transaction_type int, merchant_id int, update_time int, create_time int > ) with ( ... ) macia kk <[hidden email]> 于2020年5月26日周二 上午9:36写道: > Flink version: 1.10 > > Json: > > { > "database":"main_db", > "maxwell_ts":1590416550358000, > "table":"transaction_tab", > "data":{ > "transaction_sn":"8888", > "parent_id":0, > "user_id":333, > "amount":555, > "reference_id":"666", > "status":3, > "transaction_type":3, > "merchant_id":2, > "update_time":1590416550, > "create_time":1590416550 > }} > > > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > > macia kk <[hidden email]> 于2020年5月26日周二 上午9:34写道: > > > Flink version: 1.10 > > > > Json: > > ```j > > { > > "database":"main_db", > > "maxwell_ts":1590416550358000, > > "table":"transaction_tab", > > "data":{ > > "transaction_sn":"8888", > > "parent_id":0, > > "user_id":333, > > "amount":555, > > "reference_id":"666", > > "status":3, > > "transaction_type":3, > > "merchant_id":2, > > "update_time":1590416550, > > "create_time":1590416550 > > } > > } > > ``` > > > > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > > > > > > > Leonard Xu <[hidden email]> 于2020年5月26日周二 上午8:58写道: > > > >> Hi, kk > >> > >> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 > >> > >> > >> 祝好, > >> Leonard Xu > >> > >> > >> > 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道: > >> > > >> > 有哪位大佬帮我看下,谢谢 > >> > > >> > > >> > 尝试了很久,还是无法解析嵌套结构的Json > >> > > >> > Error > >> > > >> > Caused by: org.apache.flink.table.api.ValidationException: SQL > >> > validation failed. From line 4, column 9 to line 4, column 31: Column > >> > 'data.transaction_type' not found in any table > >> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > >> > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > >> > at > >> > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > >> > at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > >> > at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > >> > at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > >> > at > >> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > >> > at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > >> > at > >> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > >> > at > >> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > >> > at > >> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> > at > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> > at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> > at java.lang.reflect.Method.invoke(Method.java:498) > >> > at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > >> > > >> > > >> > 嵌套Json 定义的 format 和 schema 如下: > >> > > >> > .withFormat(new Json() > >> > .jsonSchema( > >> > """{type: 'object', > >> > | properties: { > >> > | database: { > >> > | type: 'string' > >> > | }, > >> > | table: { > >> > | type: 'string' > >> > | }, > >> > | maxwell_ts: { > >> > | type: 'integer' > >> > | }, > >> > | data: { > >> > | type: 'object', > >> > | properties :{ > >> > | reference_id :{ > >> > | type: 'string' > >> > | }, > >> > | transaction_type :{ > >> > | type: 'integer' > >> > | }, > >> > | merchant_id :{ > >> > | type: 'integer' > >> > | }, > >> > | create_time :{ > >> > | type: 'integer' > >> > | }, > >> > | status :{ > >> > | type: 'integer' > >> > | } > >> > | } > >> > | } > >> > | } > >> > | } > >> > """.stripMargin.replaceAll("\n", " ") > >> > ) > >> > ) > >> > .withSchema(new Schema() > >> > .field("table", STRING()) > >> > .field("database", STRING()) > >> > .field("data", ROW(FIELD("reference_id",STRING()), > >> > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), > >> > FIELD("status",INT()))) > >> > //.field("event_time", BIGINT()) > >> > // .from("maxwell_ts") > >> > //.rowtime(new Rowtime() > >> > // //.timestampsFromField("ts" * 1000) > >> > // .timestampsFromField("ts") > >> > // .watermarksPeriodicBounded(60000) > >> > //) > >> > ) > >> > > >> > > >> > bsTableEnv.sqlUpdate("""INSERT INTO yyyyy > >> > | SELECT `table`, `database` > >> > | `data.reference_id`, > >> > | `data.transaction_type`, > >> > | `data.merchant_id`, > >> > | `data.create_time`, > >> > | `data.status` > >> > | FROM xxxx""".stripMargin) > >> > >> > -- Best, Benchao Li |
嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。
claylin <[hidden email]> 于2020年5月26日周二 上午10:07写道: > 这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗 > > > create table my_source ( > database varchar, > maxwell_ts bigint, > table varchar, > data row< > transaction_sn varchar, > parent_id int, > user_id int, > amount int, > reference_id varchar, > status int, > transaction_type int, > merchant_id int, > update_time int, > create_time int > ts AS CAST(FROM_UNIXTIME(create_time) AS > TIMESTAMP(3)), // 定义事件时间 > WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE > > > ) with ( > ... > ) > > > 这样可以行吗 > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<[hidden email]>; > 发送时间: 2020年5月26日(星期二) 上午9:55 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: Flink SQL 嵌套 nested Json 解析 > > > > Hi, > > 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子: > create table my_source ( > database varchar, > maxwell_ts bigint, > table varchar, > data row< > transaction_sn varchar, > parent_id int, > user_id int, > amount int, > reference_id varchar, > status int, > transaction_type int, > merchant_id int, > update_time int, > create_time int > > > ) with ( > ... > ) > > macia kk <[hidden email]> 于2020年5月26日周二 上午9:36写道: > > > Flink version: 1.10 > > > > Json: > > > > { > > "database":"main_db", > > "maxwell_ts":1590416550358000, > > "table":"transaction_tab", > > "data":{ > > > "transaction_sn":"8888", > > "parent_id":0, > > "user_id":333, > > "amount":555, > > "reference_id":"666", > > "status":3, > > "transaction_type":3, > > "merchant_id":2, > > > "update_time":1590416550, > > > "create_time":1590416550 > > }} > > > > > > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > > > > > macia kk <[hidden email]> 于2020年5月26日周二 上午9:34写道: > > > > > Flink version: 1.10 > > > > > > Json: > > > ```j > > > { > > > "database":"main_db", > > > "maxwell_ts":1590416550358000, > > > "table":"transaction_tab", > > > "data":{ > > > > "transaction_sn":"8888", > > > "parent_id":0, > > > "user_id":333, > > > "amount":555, > > > > "reference_id":"666", > > > "status":3, > > > > "transaction_type":3, > > > "merchant_id":2, > > > > "update_time":1590416550, > > > > "create_time":1590416550 > > > } > > > } > > > ``` > > > > > > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > > > > > > > > > > > Leonard Xu <[hidden email]> 于2020年5月26日周二 上午8:58写道: > > > > > >> Hi, kk > > >> > > >> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 > > >> > > >> > > >> 祝好, > > >> Leonard Xu > > >> > > >> > > >> > 在 2020年5月26日,01:26,macia kk <[hidden email]> 写道: > > >> > > > >> > 有哪位大佬帮我看下,谢谢 > > >> > > > >> > > > >> > 尝试了很久,还是无法解析嵌套结构的Json > > >> > > > >> > Error > > >> > > > >> > Caused by: > org.apache.flink.table.api.ValidationException: SQL > > >> > validation failed. From line 4, column 9 to line 4, > column 31: Column > > >> > 'data.transaction_type' not found in any table > > >> > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > >> > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > > >> > at > > >> > > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > > >> > at > > >> > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > > >> > at > > >> > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > > >> > at > > >> > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > > >> > at > > >> > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > > >> > at > > >> > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > >> > at > > >> > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > > >> > at > > >> > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > > >> > at > > >> > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > > >> > at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > >> > at > > >> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > >> > at > > >> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> > at > java.lang.reflect.Method.invoke(Method.java:498) > > >> > at > > >> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > > >> > > > >> > > > >> > 嵌套Json 定义的 format 和 schema 如下: > > >> > > > >> > .withFormat(new Json() > > >> > .jsonSchema( > > >> > > """{type: 'object', > > >> > > | > properties: { > > >> > > > | database: { > > >> > > > | type: 'string' > > >> > > > | }, > > >> > > > | table: { > > >> > > > | type: 'string' > > >> > > > | }, > > >> > > > | maxwell_ts: { > > >> > > > | type: 'integer' > > >> > > > | }, > > >> > > > | data: { > > >> > > > | type: 'object', > > >> > > > | properties :{ > > >> > > > | > reference_id :{ > > >> > > > | > type: 'string' > > >> > > > | > }, > > >> > > > | > transaction_type :{ > > >> > > > | > type: 'integer' > > >> > > > | > }, > > >> > > > | > merchant_id :{ > > >> > > > | > type: 'integer' > > >> > > > | > }, > > >> > > > | > create_time :{ > > >> > > > | > type: 'integer' > > >> > > > | > }, > > >> > > > | > status :{ > > >> > > > | > type: 'integer' > > >> > > > | > } > > >> > > > | } > > >> > > > | } > > >> > > > | } > > >> > > | } > > >> > > """.stripMargin.replaceAll("\n", " ") > > >> > ) > > >> > ) > > >> > .withSchema(new Schema() > > >> > > .field("table", STRING()) > > >> > > .field("database", STRING()) > > >> > > .field("data", ROW(FIELD("reference_id",STRING()), > > >> > FIELD("transaction_type",INT()), > FIELD("merchant_id",INT()), > > >> > FIELD("status",INT()))) > > >> > > //.field("event_time", BIGINT()) > > >> > // > .from("maxwell_ts") > > >> > > //.rowtime(new Rowtime() > > >> > // > //.timestampsFromField("ts" * 1000) > > >> > // > .timestampsFromField("ts") > > >> > // > .watermarksPeriodicBounded(60000) > > >> > //) > > >> > ) > > >> > > > >> > > > >> > bsTableEnv.sqlUpdate("""INSERT INTO > yyyyy > > >> > > > | SELECT `table`, `database` > > >> > > > | `data.reference_id`, > > >> > > > | `data.transaction_type`, > > >> > > > | `data.merchant_id`, > > >> > > > | `data.create_time`, > > >> > > > | `data.status` > > >> > > > | FROM xxxx""".stripMargin) > > >> > > >> > > > > > -- > > Best, > Benchao Li -- Best, Benchao Li |
嗯 谢谢 我试下看下
------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年5月26日(星期二) 上午10:09 收件人: "user-zh"<[hidden email]>; 主题: Re: Flink SQL 嵌套 nested Json 解析 嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。 claylin <[hidden email]> 于2020年5月26日周二 上午10:07写道: > 这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗 > > > create table my_source ( > &nbsp; database varchar, > &nbsp; maxwell_ts bigint, > &nbsp; table varchar, > &nbsp; data row< > &nbsp;&nbsp;&nbsp; transaction_sn varchar, > &nbsp;&nbsp;&nbsp; parent_id int, > &nbsp;&nbsp;&nbsp; user_id int, > &nbsp;&nbsp;&nbsp; amount int, > &nbsp;&nbsp;&nbsp; reference_id varchar, > &nbsp;&nbsp;&nbsp; status int, > &nbsp;&nbsp;&nbsp; transaction_type int, > &nbsp;&nbsp;&nbsp; merchant_id int, > &nbsp;&nbsp;&nbsp; update_time int, > &nbsp;&nbsp;&nbsp; create_time int > &nbsp; &nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS > TIMESTAMP(3)),&nbsp; // 定义事件时间 > &nbsp; &nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE > &nbsp; &gt; > ) with ( > &nbsp;&nbsp;&nbsp; ... > ) > > > 这样可以行吗 > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; > 发送时间:&nbsp;2020年5月26日(星期二) 上午9:55 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: Flink SQL 嵌套 nested Json 解析 > > > > Hi, > > 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子: > create table my_source ( > &nbsp; database varchar, > &nbsp; maxwell_ts bigint, > &nbsp; table varchar, > &nbsp; data row< > &nbsp;&nbsp;&nbsp; transaction_sn varchar, > &nbsp;&nbsp;&nbsp; parent_id int, > &nbsp;&nbsp;&nbsp; user_id int, > &nbsp;&nbsp;&nbsp; amount int, > &nbsp;&nbsp;&nbsp; reference_id varchar, > &nbsp;&nbsp;&nbsp; status int, > &nbsp;&nbsp;&nbsp; transaction_type int, > &nbsp;&nbsp;&nbsp; merchant_id int, > &nbsp;&nbsp;&nbsp; update_time int, > &nbsp;&nbsp;&nbsp; create_time int > &nbsp; &gt; > ) with ( > &nbsp;&nbsp;&nbsp; ... > ) > > macia kk <[hidden email]&gt; 于2020年5月26日周二 上午9:36写道: > > &gt; Flink version: 1.10 > &gt; > &gt; Json: > &gt; > &gt; { > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db", > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000, > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab", > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{ > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "transaction_sn":"8888", > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "reference_id":"666", > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_type":3, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "update_time":1590416550, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "create_time":1590416550 > &gt;&nbsp;&nbsp;&nbsp;&nbsp; }} > &gt; > &gt; > &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > &gt; > &gt; > &gt; macia kk <[hidden email]&gt; 于2020年5月26日周二 上午9:34写道: > &gt; > &gt; &gt; Flink version: 1.10 > &gt; &gt; > &gt; &gt; Json: > &gt; &gt; ```j > &gt; &gt; { > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db", > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab", > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{ > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "transaction_sn":"8888", > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "reference_id":"666", > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "transaction_type":3, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "update_time":1590416550, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "create_time":1590416550 > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; } > &gt; &gt; } > &gt; &gt; ``` > &gt; &gt; > &gt; &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > &gt; &gt; > &gt; &gt; > &gt; &gt; > &gt; &gt; Leonard Xu <[hidden email]&gt; 于2020年5月26日周二 上午8:58写道: > &gt; &gt; > &gt; &gt;&gt; Hi, kk > &gt; &gt;&gt; > &gt; &gt;&gt; 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 > &gt; &gt;&gt; > &gt; &gt;&gt; > &gt; &gt;&gt; 祝好, > &gt; &gt;&gt; Leonard Xu > &gt; &gt;&gt; > &gt; &gt;&gt; > &gt; &gt;&gt; &gt; 在 2020年5月26日,01:26,macia kk <[hidden email]&gt; 写道: > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; 有哪位大佬帮我看下,谢谢 > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; 尝试了很久,还是无法解析嵌套结构的Json > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; Error > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; Caused by: > org.apache.flink.table.api.ValidationException: SQL > &gt; &gt;&gt; &gt; validation failed. From line 4, column 9 to line 4, > column 31: Column > &gt; &gt;&gt; &gt; 'data.transaction_type' not found in any table > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > &gt; &gt;&gt; > &gt; > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > java.lang.reflect.Method.invoke(Method.java:498) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; 嵌套Json 定义的 format 和 schema 如下: > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withFormat(new Json() > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .jsonSchema( > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > """{type: 'object', > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp; > properties: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; database: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; table: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; maxwell_ts: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; data: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'object', > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > reference_id :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'string' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > transaction_type :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > merchant_id :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > create_time :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > status :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp; } > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; | } > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > """.stripMargin.replaceAll("\n", " ") > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withSchema(new Schema() > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .field("table", STRING()) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .field("database", STRING()) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .field("data", ROW(FIELD("reference_id",STRING()), > &gt; &gt;&gt; &gt; FIELD("transaction_type",INT()), > FIELD("merchant_id",INT()), > &gt; &gt;&gt; &gt; FIELD("status",INT()))) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //.field("event_time", BIGINT()) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; > .from("maxwell_ts") > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //.rowtime(new Rowtime() > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; > //.timestampsFromField("ts" * 1000) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; > .timestampsFromField("ts") > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; > .watermarksPeriodicBounded(60000) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; bsTableEnv.sqlUpdate("""INSERT INTO > yyyyy > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > | SELECT `table`, `database` > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.reference_id`, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.transaction_type`, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.merchant_id`, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.create_time`, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.status` > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > | FROM xxxx""".stripMargin) > &gt; &gt;&gt; > &gt; &gt;&gt; > &gt; > > > -- > > Best, > Benchao Li -- Best, Benchao Li |
Hi,
关于 Json 的解析,当你的 Json 里面的一个字段一个镶嵌类型的话,可以将其定义为一个 row,row 里面还可以定义 row 字段。 注意 row 里面的字段名称要和原始json 里面的字段一致。 Best, LakeShen claylin <[hidden email]> 于2020年5月26日周二 上午10:17写道: > 嗯 谢谢 我试下看下 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<[hidden email]>; > 发送时间: 2020年5月26日(星期二) 上午10:09 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: Flink SQL 嵌套 nested Json 解析 > > > > 嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。 > > claylin <[hidden email]> 于2020年5月26日周二 上午10:07写道: > > > 这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗 > > > > > > create table my_source ( > > &nbsp; database varchar, > > &nbsp; maxwell_ts bigint, > > &nbsp; table varchar, > > &nbsp; data row< > > &nbsp;&nbsp;&nbsp; transaction_sn varchar, > > &nbsp;&nbsp;&nbsp; parent_id int, > > &nbsp;&nbsp;&nbsp; user_id int, > > &nbsp;&nbsp;&nbsp; amount int, > > &nbsp;&nbsp;&nbsp; reference_id varchar, > > &nbsp;&nbsp;&nbsp; status int, > > &nbsp;&nbsp;&nbsp; transaction_type int, > > &nbsp;&nbsp;&nbsp; merchant_id int, > > &nbsp;&nbsp;&nbsp; update_time int, > > &nbsp;&nbsp;&nbsp; create_time int > > &nbsp; &nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS > > TIMESTAMP(3)),&nbsp; // 定义事件时间 > > &nbsp; &nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE > > &nbsp; &gt; > > ) with ( > > &nbsp;&nbsp;&nbsp; ... > > ) > > > > > > 这样可以行吗 > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年5月26日(星期二) 上午9:55 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: Flink SQL 嵌套 nested Json 解析 > > > > > > > > Hi, > > > > 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子: > > create table my_source ( > > &nbsp; database varchar, > > &nbsp; maxwell_ts bigint, > > &nbsp; table varchar, > > &nbsp; data row< > > &nbsp;&nbsp;&nbsp; transaction_sn varchar, > > &nbsp;&nbsp;&nbsp; parent_id int, > > &nbsp;&nbsp;&nbsp; user_id int, > > &nbsp;&nbsp;&nbsp; amount int, > > &nbsp;&nbsp;&nbsp; reference_id varchar, > > &nbsp;&nbsp;&nbsp; status int, > > &nbsp;&nbsp;&nbsp; transaction_type int, > > &nbsp;&nbsp;&nbsp; merchant_id int, > > &nbsp;&nbsp;&nbsp; update_time int, > > &nbsp;&nbsp;&nbsp; create_time int > > &nbsp; &gt; > > ) with ( > > &nbsp;&nbsp;&nbsp; ... > > ) > > > > macia kk <[hidden email]&gt; 于2020年5月26日周二 上午9:36写道: > > > > &gt; Flink version: 1.10 > > &gt; > > &gt; Json: > > &gt; > > &gt; { > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db", > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "maxwell_ts":1590416550358000, > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "table":"transaction_tab", > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{ > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "transaction_sn":"8888", > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "parent_id":0, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "user_id":333, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "amount":555, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "reference_id":"666", > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "status":3, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "transaction_type":3, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "merchant_id":2, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "update_time":1590416550, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "create_time":1590416550 > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; }} > > &gt; > > &gt; > > &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > &gt; > > &gt; > > &gt; macia kk <[hidden email]&gt; 于2020年5月26日周二 上午9:34写道: > > &gt; > > &gt; &gt; Flink version: 1.10 > > &gt; &gt; > > &gt; &gt; Json: > > &gt; &gt; ```j > > &gt; &gt; { > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "database":"main_db", > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "maxwell_ts":1590416550358000, > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "table":"transaction_tab", > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{ > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "transaction_sn":"8888", > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "parent_id":0, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "user_id":333, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "amount":555, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "reference_id":"666", > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "status":3, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "transaction_type":3, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "merchant_id":2, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "update_time":1590416550, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "create_time":1590416550 > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; } > > &gt; &gt; } > > &gt; &gt; ``` > > &gt; &gt; > > &gt; &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > &gt; &gt; > > &gt; &gt; > > &gt; &gt; > > &gt; &gt; Leonard Xu <[hidden email]&gt; > 于2020年5月26日周二 上午8:58写道: > > &gt; &gt; > > &gt; &gt;&gt; Hi, kk > > &gt; &gt;&gt; > > &gt; &gt;&gt; > 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 > > &gt; &gt;&gt; > > &gt; &gt;&gt; > > &gt; &gt;&gt; 祝好, > > &gt; &gt;&gt; Leonard Xu > > &gt; &gt;&gt; > > &gt; &gt;&gt; > > &gt; &gt;&gt; &gt; 在 2020年5月26日,01:26,macia kk < > [hidden email]&gt; 写道: > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; 有哪位大佬帮我看下,谢谢 > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; 尝试了很久,还是无法解析嵌套结构的Json > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; Error > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; Caused by: > > org.apache.flink.table.api.ValidationException: SQL > > &gt; &gt;&gt; &gt; validation failed. From line 4, > column 9 to line 4, > > column 31: Column > > &gt; &gt;&gt; &gt; 'data.transaction_type' not found > in any table > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > &gt; &gt;&gt; > > &gt; > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > java.lang.reflect.Method.invoke(Method.java:498) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; 嵌套Json 定义的 format 和 schema 如下: > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withFormat(new > Json() > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .jsonSchema( > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > """{type: 'object', > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp; > > properties: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; database: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'string' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; table: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'string' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; maxwell_ts: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; data: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'object', > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > properties :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > reference_id :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'string' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > transaction_type :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > merchant_id :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > create_time :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > status :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > } > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp; } > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > | } > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > """.stripMargin.replaceAll("\n", " ") > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > ) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withSchema(new > Schema() > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > .field("table", STRING()) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > .field("database", STRING()) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > .field("data", ROW(FIELD("reference_id",STRING()), > > &gt; &gt;&gt; &gt; FIELD("transaction_type",INT()), > > FIELD("merchant_id",INT()), > > &gt; &gt;&gt; &gt; FIELD("status",INT()))) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > //.field("event_time", BIGINT()) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //&nbsp; > > .from("maxwell_ts") > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > //.rowtime(new Rowtime() > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //&nbsp; > > //.timestampsFromField("ts" * 1000) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //&nbsp; > > .timestampsFromField("ts") > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //&nbsp; > > .watermarksPeriodicBounded(60000) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; > bsTableEnv.sqlUpdate("""INSERT INTO > > yyyyy > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > | SELECT `table`, `database` > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.reference_id`, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.transaction_type`, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.merchant_id`, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.create_time`, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.status` > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > | FROM xxxx""".stripMargin) > > &gt; &gt;&gt; > > &gt; &gt;&gt; > > &gt; > > > > > > -- > > > > Best, > > Benchao Li > > > > -- > > Best, > Benchao Li |
Free forum by Nabble | Edit this page |