Flink SQL读取复杂JSON格式

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL读取复杂JSON格式

zapjone
Hello,各位大佬:
请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
请问各位大佬有啥高招呢?谢谢。


kafka消息样例(data的value是动态的):
{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
我定义的schema:
create table kafka_message(
 source string,
data array<string>
)with...



Reply | Threaded
Open this post in threaded view
|

Re:Flink SQL读取复杂JSON格式

Appleyuchi
怎么个动态法?





在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道:

>Hello,各位大佬:
>请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
>请问各位大佬有啥高招呢?谢谢。
>
>
>kafka消息样例(data的value是动态的):
>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
>我定义的schema:
>create table kafka_message(
> source string,
>data array<string>
>)with...
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Flink SQL读取复杂JSON格式

zapjone
比如下面这种消息:
第一条消息:
{"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]}
第二条消息:
{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
第三条消息:
{"source":"transaction_2020202020200","data":[]}
我想直接在创建表时用一个字段来表示data这个属性的所有值。

















在 2020-12-09 13:21:41,"Appleyuchi" <[hidden email]> 写道:

>怎么个动态法?
>
>
>
>
>
>在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道:
>>Hello,各位大佬:
>>请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
>>请问各位大佬有啥高招呢?谢谢。
>>
>>
>>kafka消息样例(data的value是动态的):
>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
>>我定义的schema:
>>create table kafka_message(
>> source string,
>>data array<string>
>>)with...
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Flink SQL读取复杂JSON格式

hailongwang
http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259
这个邮件列表有相似的问题,你看下有没有帮助。
PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source 消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/raw.html


Best,
hailong

在 2020-12-09 12:32:42,"破极" <[hidden email]> 写道:

>比如下面这种消息:
>第一条消息:
>{"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]}
>第二条消息:
>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
>第三条消息:
>{"source":"transaction_2020202020200","data":[]}
>我想直接在创建表时用一个字段来表示data这个属性的所有值。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-12-09 13:21:41,"Appleyuchi" <[hidden email]> 写道:
>>怎么个动态法?
>>
>>
>>
>>
>>
>>在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道:
>>>Hello,各位大佬:
>>>请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
>>>请问各位大佬有啥高招呢?谢谢。
>>>
>>>
>>>kafka消息样例(data的value是动态的):
>>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
>>>我定义的schema:
>>>create table kafka_message(
>>> source string,
>>>data array<string>
>>>)with...
>>>
>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:Flink SQL读取复杂JSON格式

zapjone
刚才搜到了,谢谢

















在 2020-12-09 15:20:07,"hailongwang" <[hidden email]> 写道:

>http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259
>这个邮件列表有相似的问题,你看下有没有帮助。
>PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source 消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。
>[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/raw.html
>
>
>Best,
>hailong
>
>在 2020-12-09 12:32:42,"破极" <[hidden email]> 写道:
>>比如下面这种消息:
>>第一条消息:
>>{"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]}
>>第二条消息:
>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
>>第三条消息:
>>{"source":"transaction_2020202020200","data":[]}
>>我想直接在创建表时用一个字段来表示data这个属性的所有值。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2020-12-09 13:21:41,"Appleyuchi" <[hidden email]> 写道:
>>>怎么个动态法?
>>>
>>>
>>>
>>>
>>>
>>>在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道:
>>>>Hello,各位大佬:
>>>>请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
>>>>请问各位大佬有啥高招呢?谢谢。
>>>>
>>>>
>>>>kafka消息样例(data的value是动态的):
>>>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
>>>>我定义的schema:
>>>>create table kafka_message(
>>>> source string,
>>>>data array<string>
>>>>)with...
>>>>
>>>>
>>>>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:Re:Flink SQL读取复杂JSON格式

zapjone
看了下邮箱列表中提到的方式,目前没打算升级flink,采用自定义format方式解决了这个问题。感谢各位大佬。

















在 2020-12-09 15:27:09,"破极" <[hidden email]> 写道:

>刚才搜到了,谢谢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-12-09 15:20:07,"hailongwang" <[hidden email]> 写道:
>>http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259
>>这个邮件列表有相似的问题,你看下有没有帮助。
>>PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source 消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。
>>[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/raw.html
>>
>>
>>Best,
>>hailong
>>
>>在 2020-12-09 12:32:42,"破极" <[hidden email]> 写道:
>>>比如下面这种消息:
>>>第一条消息:
>>>{"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]}
>>>第二条消息:
>>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
>>>第三条消息:
>>>{"source":"transaction_2020202020200","data":[]}
>>>我想直接在创建表时用一个字段来表示data这个属性的所有值。
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2020-12-09 13:21:41,"Appleyuchi" <[hidden email]> 写道:
>>>>怎么个动态法?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道:
>>>>>Hello,各位大佬:
>>>>>请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
>>>>>请问各位大佬有啥高招呢?谢谢。
>>>>>
>>>>>
>>>>>kafka消息样例(data的value是动态的):
>>>>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]}
>>>>>我定义的schema:
>>>>>create table kafka_message(
>>>>> source string,
>>>>>data array<string>
>>>>>)with...
>>>>>
>>>>>
>>>>>