Hello,各位大佬:
请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 请问各位大佬有啥高招呢?谢谢。 kafka消息样例(data的value是动态的): {"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} 我定义的schema: create table kafka_message( source string, data array<string> )with... |
怎么个动态法?
在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道: >Hello,各位大佬: >请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 >请问各位大佬有啥高招呢?谢谢。 > > >kafka消息样例(data的value是动态的): >{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} >我定义的schema: >create table kafka_message( > source string, >data array<string> >)with... > > > |
比如下面这种消息:
第一条消息: {"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]} 第二条消息: {"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} 第三条消息: {"source":"transaction_2020202020200","data":[]} 我想直接在创建表时用一个字段来表示data这个属性的所有值。 在 2020-12-09 13:21:41,"Appleyuchi" <[hidden email]> 写道: >怎么个动态法? > > > > > >在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道: >>Hello,各位大佬: >>请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 >>请问各位大佬有啥高招呢?谢谢。 >> >> >>kafka消息样例(data的value是动态的): >>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} >>我定义的schema: >>create table kafka_message( >> source string, >>data array<string> >>)with... >> >> >> |
http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259
这个邮件列表有相似的问题,你看下有没有帮助。 PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source 消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/raw.html Best, hailong 在 2020-12-09 12:32:42,"破极" <[hidden email]> 写道: >比如下面这种消息: >第一条消息: >{"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]} >第二条消息: >{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} >第三条消息: >{"source":"transaction_2020202020200","data":[]} >我想直接在创建表时用一个字段来表示data这个属性的所有值。 > > > > > > > > > > > > > > > > > >在 2020-12-09 13:21:41,"Appleyuchi" <[hidden email]> 写道: >>怎么个动态法? >> >> >> >> >> >>在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道: >>>Hello,各位大佬: >>>请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 >>>请问各位大佬有啥高招呢?谢谢。 >>> >>> >>>kafka消息样例(data的value是动态的): >>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} >>>我定义的schema: >>>create table kafka_message( >>> source string, >>>data array<string> >>>)with... >>> >>> >>> |
刚才搜到了,谢谢
在 2020-12-09 15:20:07,"hailongwang" <[hidden email]> 写道: >http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259 >这个邮件列表有相似的问题,你看下有没有帮助。 >PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source 消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。 >[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/raw.html > > >Best, >hailong > >在 2020-12-09 12:32:42,"破极" <[hidden email]> 写道: >>比如下面这种消息: >>第一条消息: >>{"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]} >>第二条消息: >>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} >>第三条消息: >>{"source":"transaction_2020202020200","data":[]} >>我想直接在创建表时用一个字段来表示data这个属性的所有值。 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2020-12-09 13:21:41,"Appleyuchi" <[hidden email]> 写道: >>>怎么个动态法? >>> >>> >>> >>> >>> >>>在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道: >>>>Hello,各位大佬: >>>>请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 >>>>请问各位大佬有啥高招呢?谢谢。 >>>> >>>> >>>>kafka消息样例(data的value是动态的): >>>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} >>>>我定义的schema: >>>>create table kafka_message( >>>> source string, >>>>data array<string> >>>>)with... >>>> >>>> >>>> |
看了下邮箱列表中提到的方式,目前没打算升级flink,采用自定义format方式解决了这个问题。感谢各位大佬。
在 2020-12-09 15:27:09,"破极" <[hidden email]> 写道: >刚才搜到了,谢谢 > > > > > > > > > > > > > > > > > >在 2020-12-09 15:20:07,"hailongwang" <[hidden email]> 写道: >>http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259 >>这个邮件列表有相似的问题,你看下有没有帮助。 >>PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source 消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。 >>[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/raw.html >> >> >>Best, >>hailong >> >>在 2020-12-09 12:32:42,"破极" <[hidden email]> 写道: >>>比如下面这种消息: >>>第一条消息: >>>{"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]} >>>第二条消息: >>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} >>>第三条消息: >>>{"source":"transaction_2020202020200","data":[]} >>>我想直接在创建表时用一个字段来表示data这个属性的所有值。 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>在 2020-12-09 13:21:41,"Appleyuchi" <[hidden email]> 写道: >>>>怎么个动态法? >>>> >>>> >>>> >>>> >>>> >>>>在 2020-12-09 13:18:56,"破极" <[hidden email]> 写道: >>>>>Hello,各位大佬: >>>>>请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array<string>,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 >>>>>请问各位大佬有啥高招呢?谢谢。 >>>>> >>>>> >>>>>kafka消息样例(data的value是动态的): >>>>>{"source":"transaction_2020202020200","data":[{"name":"d1111"},{"age":18}]} >>>>>我定义的schema: >>>>>create table kafka_message( >>>>> source string, >>>>>data array<string> >>>>>)with... >>>>> >>>>> >>>>> |
Free forum by Nabble | Edit this page |