Hi, guaishushu
贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗? 用个单元测试应该就可以复现问题 Best, Leonard [1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java <https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java> > 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]> 写道: > > kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。 > > > > > > > > [hidden email] <mailto:[hidden email]> |
语句:
CREATE TABLE A ( w_data STRING, w_table STRING, w_ts TIMESTAMP(3) CREATE TABLE B ( w_ts TIMESTAMP(3), city1_id STRING, cate3_id STRING, pay_order_id STRING ) insert into B select w_ts, 'test' as city1_id, ArrayIndexOf(w_data, 0) AS cate3_id, w_data as pay_order_id from A 部分数据 A {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"} B {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""} [hidden email] 发件人: Leonard Xu 发送时间: 2020-05-20 16:03 收件人: user-zh 主题: Re: Flink 1.10-SQL解析复杂json问题 Hi, guaishushu 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗? 用个单元测试应该就可以复现问题 Best, Leonard [1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java <https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java> > 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]> 写道: > > kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。 > > > > > > > > [hidden email] <mailto:[hidden email]> |
Flink里面对于Json的解析,是直接用的jackson,然后如果你声明的是varchar类型,会直接调用JsonNode.asText(),这个如果是container类型(也就是复杂类型)的话,是空字符串吧。
[hidden email] <[hidden email]> 于2020年5月20日周三 下午6:06写道: > 语句: > CREATE TABLE A ( > w_data STRING, > w_table STRING, > w_ts TIMESTAMP(3) > > > CREATE TABLE B ( > w_ts TIMESTAMP(3), > city1_id STRING, > cate3_id STRING, > pay_order_id STRING > ) > > insert into B > select w_ts, > > 'test' as city1_id, > > ArrayIndexOf(w_data, 0) AS cate3_id, > w_data as pay_order_id > from A > > 部分数据 > A > > {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"} > > B > > {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""} > > > > [hidden email] > > 发件人: Leonard Xu > 发送时间: 2020-05-20 16:03 > 收件人: user-zh > 主题: Re: Flink 1.10-SQL解析复杂json问题 > Hi, guaishushu > 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗? > 用个单元测试应该就可以复现问题 > > Best, > Leonard > > [1] > https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java > < > https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java > > > > > 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]> > 写道: > > > > kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。 > > > > > > > > > > > > > > > > [hidden email] <mailto:[hidden email]> > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
In reply to this post by guaishushu1103@163.com
Hi,
> 语句: > CREATE TABLE A ( > w_data STRING, > w_table STRING, > w_ts TIMESTAMP(3) 如果需要 w_data 字段是一个json数组,需要声明表时声明对应的结构化数据类型[1] 即你这里的A表需要声明成: create table json_table( w_es BIGINT, w_type STRING, w_isDdl BOOLEAN, w_data ARRAY<ROW<pay_info STRING, online_fee DOUBLE, sign STRING, account_pay_fee DOUBLE>>, w_ts TIMESTAMP(3), w_table STRING) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'json-test', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'test-jdbc', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json', 'format.derive-schema' = 'true' ) Best, Leonard Xu > 在 2020年5月20日,18:06,[hidden email] 写道: > > 语句: > CREATE TABLE A ( > w_data STRING, > w_table STRING, > w_ts TIMESTAMP(3) > > > CREATE TABLE B ( > w_ts TIMESTAMP(3), > city1_id STRING, > cate3_id STRING, > pay_order_id STRING > ) > > insert into B > select w_ts, > > 'test' as city1_id, > > ArrayIndexOf(w_data, 0) AS cate3_id, > w_data as pay_order_id > from A > > 部分数据 > A > {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"} > > B > {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""} > > > > [hidden email] > > 发件人: Leonard Xu > 发送时间: 2020-05-20 16:03 > 收件人: user-zh > 主题: Re: Flink 1.10-SQL解析复杂json问题 > Hi, guaishushu > 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗? > 用个单元测试应该就可以复现问题 > > Best, > Leonard > > [1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java <https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java> > >> 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]> 写道: >> >> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。 >> >> >> >> >> >> >> >> [hidden email] <mailto:[hidden email]> |
+ 补个文档链接[1], 以及可能遇到一个潜在问题的issue链接:
[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/types.html#%E7%BB%93%E6%9E%84%E5%8C%96%E7%9A%84%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/types.html#%E7%BB%93%E6%9E%84%E5%8C%96%E7%9A%84%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B> [2] https://issues.apache.org/jira/browse/FLINK-17847 <https://issues.apache.org/jira/browse/FLINK-17847> > 在 2020年5月21日,00:01,Leonard Xu <[hidden email]> 写道: > > Hi, >> 语句: >> CREATE TABLE A ( >> w_data STRING, >> w_table STRING, >> w_ts TIMESTAMP(3) > > 如果需要 w_data 字段是一个json数组,需要声明表时声明对应的结构化数据类型[1] > > 即你这里的A表需要声明成: > create table json_table( > w_es BIGINT, > w_type STRING, > w_isDdl BOOLEAN, > w_data ARRAY<ROW<pay_info STRING, online_fee DOUBLE, sign STRING, account_pay_fee DOUBLE>>, > w_ts TIMESTAMP(3), > w_table STRING) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'json-test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'test-jdbc', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ) > > > Best, > Leonard Xu > >> 在 2020年5月20日,18:06,[hidden email] 写道: >> >> 语句: >> CREATE TABLE A ( >> w_data STRING, >> w_table STRING, >> w_ts TIMESTAMP(3) >> >> >> CREATE TABLE B ( >> w_ts TIMESTAMP(3), >> city1_id STRING, >> cate3_id STRING, >> pay_order_id STRING >> ) >> >> insert into B >> select w_ts, >> >> 'test' as city1_id, >> >> ArrayIndexOf(w_data, 0) AS cate3_id, >> w_data as pay_order_id >> from A >> >> 部分数据 >> A >> {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"} >> >> B >> {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""} >> >> >> >> [hidden email] >> >> 发件人: Leonard Xu >> 发送时间: 2020-05-20 16:03 >> 收件人: user-zh >> 主题: Re: Flink 1.10-SQL解析复杂json问题 >> Hi, guaishushu >> 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗? >> 用个单元测试应该就可以复现问题 >> >> Best, >> Leonard >> >> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java <https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java> >> >>> 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]> 写道: >>> >>> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。 >>> >>> >>> >>> >>> >>> >>> >>> [hidden email] <mailto:[hidden email]> > |
Free forum by Nabble | Edit this page |