Flink SQL处理Array型的JSON

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL处理Array型的JSON

deadwind4
Hi

Kafka中的JSON结构是个Array例子如下。
[
 { "id": 1},
 { "id": 2}
]
读出来变成表的两行。Flink SQL层面最佳实践是什么?
如果没有办法是不是只能改JSON结构了。


可爱的木兰
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL处理Array型的JSON

Leonard Xu
Hello,可爱的木兰

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html>

> 在 2020年7月14日,10:34,hua mulan <[hidden email]> 写道:
>
> 可爱的木兰

Reply | Threaded
Open this post in threaded view
|

回复: Flink SQL处理Array型的JSON

deadwind4
Hello,Leonard Xu

我这边JSON 不是

{
    "id": 2,
    "heap": [
        {
            "foo": 14,
            "bar": "foo"

        },
        {
            "foo": 16,
            "bar": "bar"
        }
    ],
}

而是直接一个Array

[
        {
            "foo": 14,
            "bar": "foo"

        },
        {
            "foo": 16,
            "bar": "bar"
        }
    ]

我发现DDL没法声明,SQL层面我不知道怎么做了。

可爱的木兰

________________________________
发件人: Leonard Xu <[hidden email]>
发送时间: 2020年7月14日 10:42
收件人: user-zh <[hidden email]>
主题: Re: Flink SQL处理Array型的JSON

Hello,可爱的木兰

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html>

> 在 2020年7月14日,10:34,hua mulan <[hidden email]> 写道:
>
> 可爱的木兰

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL处理Array型的JSON

Benchao Li-2
In reply to this post by Leonard Xu
我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].

[1] https://issues.apache.org/jira/browse/FLINK-18590

Leonard Xu <[hidden email]> 于2020年7月14日周二 上午10:42写道:

> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan <[hidden email]> 写道:
> >
> > 可爱的木兰
>
>

--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复: Flink SQL处理Array型的JSON

deadwind4
Hi

那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 在导入Kafka,之后再FlinkSQL 处理。

可爱的木兰

________________________________
发件人: Benchao Li <[hidden email]>
发送时间: 2020年7月14日 11:00
收件人: user-zh <[hidden email]>
主题: Re: Flink SQL处理Array型的JSON

我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].

[1] https://issues.apache.org/jira/browse/FLINK-18590

Leonard Xu <[hidden email]> 于2020年7月14日周二 上午10:42写道:

> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan <[hidden email]> 写道:
> >
> > 可爱的木兰
>
>

--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 回复: Flink SQL处理Array型的JSON

wxpcc
如果不等待最新版本的话也可以这样

将 纯数组的数据作为字符串 从source消费,增加自定义的json解析函数,判断 isArray 之后 遍历进行 collect

if (Objects.nonNull(str)) {
                if (isArray) {
                    JsonNode node = objectMapper.readTree(str);
                    if (node.isArray()) {
                        Iterator<JsonNode> nodeIterator = node.elements();
                        while (nodeIterator.hasNext()) {
                           
collect(deserializationSchema.deserialize(nodeIterator.next().toString().getBytes()));
                        }
                    }
                } else {
                   
collect(deserializationSchema.deserialize(str.getBytes()));
                }
            }



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 回复: Flink SQL处理Array型的JSON

wxpcc
补充:
最终查询为

SELECT
 t.*
FROM
  kafka_source,
  LATERAL TABLE( fromJson(data) ) as t



--
Sent from: http://apache-flink.147419.n8.nabble.com/