Flink 1.10-SQL解析复杂json问题

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.10-SQL解析复杂json问题

guaishushu1103@163.com
kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。



Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10-SQL解析复杂json问题

Leonard Xu
Hi, guaishushu
贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
用个单元测试应该就可以复现问题

Best,
Leonard

[1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java <https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java>

> 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]> 写道:
>
> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
>
>
>
>
>
>
>
> [hidden email] <mailto:[hidden email]>
Reply | Threaded
Open this post in threaded view
|

这种复杂数据直接解析成null了

guaishushu1103@163.com
语句:
CREATE TABLE A (
w_data  STRING,
w_table  STRING,
w_ts TIMESTAMP(3)


CREATE TABLE B (
w_ts TIMESTAMP(3),
city1_id  STRING,
cate3_id  STRING,
pay_order_id  STRING
)

insert into B
select w_ts,

'test' as city1_id,

ArrayIndexOf(w_data, 0) AS cate3_id,
w_data as pay_order_id
from A

部分数据
A
{"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"}

B
{"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}



[hidden email]
 
发件人: Leonard Xu
发送时间: 2020-05-20 16:03
收件人: user-zh
主题: Re: Flink 1.10-SQL解析复杂json问题
Hi, guaishushu
贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
用个单元测试应该就可以复现问题
 
Best,
Leonard
 
[1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java <https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java>
 

> 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]> 写道:
>
> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
>
>
>
>
>
>
>
> [hidden email] <mailto:[hidden email]>
Reply | Threaded
Open this post in threaded view
|

Re: 这种复杂数据直接解析成null了

Benchao Li
Flink里面对于Json的解析,是直接用的jackson,然后如果你声明的是varchar类型,会直接调用JsonNode.asText(),这个如果是container类型(也就是复杂类型)的话,是空字符串吧。

[hidden email] <[hidden email]> 于2020年5月20日周三 下午6:06写道:

> 语句:
> CREATE TABLE A (
> w_data  STRING,
> w_table  STRING,
> w_ts TIMESTAMP(3)
>
>
> CREATE TABLE B (
> w_ts TIMESTAMP(3),
> city1_id  STRING,
> cate3_id  STRING,
> pay_order_id  STRING
> )
>
> insert into B
> select w_ts,
>
> 'test' as city1_id,
>
> ArrayIndexOf(w_data, 0) AS cate3_id,
> w_data as pay_order_id
> from A
>
> 部分数据
> A
>
> {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"}
>
> B
>
> {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}
>
>
>
> [hidden email]
>
> 发件人: Leonard Xu
> 发送时间: 2020-05-20 16:03
> 收件人: user-zh
> 主题: Re: Flink 1.10-SQL解析复杂json问题
> Hi, guaishushu
> 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
> 用个单元测试应该就可以复现问题
>
> Best,
> Leonard
>
> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
> <
> https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
> >
>
> > 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]>
> 写道:
> >
> > kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
> >
> >
> >
> >
> >
> >
> >
> > [hidden email] <mailto:[hidden email]>
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 这种复杂数据直接解析成null了

Leonard Xu
In reply to this post by guaishushu1103@163.com
Hi,
> 语句:
> CREATE TABLE A (
> w_data  STRING,
> w_table  STRING,
> w_ts TIMESTAMP(3)

如果需要 w_data 字段是一个json数组,需要声明表时声明对应的结构化数据类型[1]

即你这里的A表需要声明成:
create table json_table(
  w_es BIGINT,
        w_type STRING,
        w_isDdl BOOLEAN,
        w_data ARRAY<ROW<pay_info STRING, online_fee DOUBLE, sign STRING, account_pay_fee DOUBLE>>,
        w_ts TIMESTAMP(3),
        w_table STRING) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'json-test',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.group.id' = 'test-jdbc',
  'connector.startup-mode' = 'earliest-offset',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
)


Best,
Leonard Xu

> 在 2020年5月20日,18:06,[hidden email] 写道:
>
> 语句:
> CREATE TABLE A (
> w_data  STRING,
> w_table  STRING,
> w_ts TIMESTAMP(3)
>
>
> CREATE TABLE B (
> w_ts TIMESTAMP(3),
> city1_id  STRING,
> cate3_id  STRING,
> pay_order_id  STRING
> )
>
> insert into B
> select w_ts,
>
> 'test' as city1_id,
>
> ArrayIndexOf(w_data, 0) AS cate3_id,
> w_data as pay_order_id
> from A
>
> 部分数据
> A
> {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"}
>
> B
> {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}
>
>
>
> [hidden email]
>
> 发件人: Leonard Xu
> 发送时间: 2020-05-20 16:03
> 收件人: user-zh
> 主题: Re: Flink 1.10-SQL解析复杂json问题
> Hi, guaishushu
> 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
> 用个单元测试应该就可以复现问题
>
> Best,
> Leonard
>
> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java <https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java>
>
>> 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]> 写道:
>>
>> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
>>
>>
>>
>>
>>
>>
>>
>> [hidden email] <mailto:[hidden email]>

Reply | Threaded
Open this post in threaded view
|

Re: 这种复杂数据直接解析成null了

Leonard Xu
+ 补个文档链接[1], 以及可能遇到一个潜在问题的issue链接:

[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/types.html#%E7%BB%93%E6%9E%84%E5%8C%96%E7%9A%84%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/types.html#%E7%BB%93%E6%9E%84%E5%8C%96%E7%9A%84%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B>
[2] https://issues.apache.org/jira/browse/FLINK-17847 <https://issues.apache.org/jira/browse/FLINK-17847>

> 在 2020年5月21日,00:01,Leonard Xu <[hidden email]> 写道:
>
> Hi,
>> 语句:
>> CREATE TABLE A (
>> w_data  STRING,
>> w_table  STRING,
>> w_ts TIMESTAMP(3)
>
> 如果需要 w_data 字段是一个json数组,需要声明表时声明对应的结构化数据类型[1]
>
> 即你这里的A表需要声明成:
> create table json_table(
> w_es BIGINT,
> w_type STRING,
> w_isDdl BOOLEAN,
> w_data ARRAY<ROW<pay_info STRING, online_fee DOUBLE, sign STRING, account_pay_fee DOUBLE>>,
> w_ts TIMESTAMP(3),
> w_table STRING) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = '0.11',
>  'connector.topic' = 'json-test',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'connector.properties.bootstrap.servers' = 'localhost:9092',
>  'connector.properties.group.id' = 'test-jdbc',
>  'connector.startup-mode' = 'earliest-offset',
>  'format.type' = 'json',
>  'format.derive-schema' = 'true'
> )
>
>
> Best,
> Leonard Xu
>
>> 在 2020年5月20日,18:06,[hidden email] 写道:
>>
>> 语句:
>> CREATE TABLE A (
>> w_data  STRING,
>> w_table  STRING,
>> w_ts TIMESTAMP(3)
>>
>>
>> CREATE TABLE B (
>> w_ts TIMESTAMP(3),
>> city1_id  STRING,
>> cate3_id  STRING,
>> pay_order_id  STRING
>> )
>>
>> insert into B
>> select w_ts,
>>
>> 'test' as city1_id,
>>
>> ArrayIndexOf(w_data, 0) AS cate3_id,
>> w_data as pay_order_id
>> from A
>>
>> 部分数据
>> A
>> {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"}
>>
>> B
>> {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}
>>
>>
>>
>> [hidden email]
>>
>> 发件人: Leonard Xu
>> 发送时间: 2020-05-20 16:03
>> 收件人: user-zh
>> 主题: Re: Flink 1.10-SQL解析复杂json问题
>> Hi, guaishushu
>> 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
>> 用个单元测试应该就可以复现问题
>>
>> Best,
>> Leonard
>>
>> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java <https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java>
>>
>>> 在 2020年5月20日,15:51,[hidden email] <mailto:[hidden email]> 写道:
>>>
>>> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> [hidden email] <mailto:[hidden email]>
>