Hi
Kafka中的JSON结构是个Array例子如下。 [ { "id": 1}, { "id": 2} ] 读出来变成表的两行。Flink SQL层面最佳实践是什么? 如果没有办法是不是只能改JSON结构了。 可爱的木兰 |
Hello,可爱的木兰
可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1] SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html> > 在 2020年7月14日,10:34,hua mulan <[hidden email]> 写道: > > 可爱的木兰 |
Hello,Leonard Xu
我这边JSON 不是 { "id": 2, "heap": [ { "foo": 14, "bar": "foo" }, { "foo": 16, "bar": "bar" } ], } 而是直接一个Array [ { "foo": 14, "bar": "foo" }, { "foo": 16, "bar": "bar" } ] 我发现DDL没法声明,SQL层面我不知道怎么做了。 可爱的木兰 ________________________________ 发件人: Leonard Xu <[hidden email]> 发送时间: 2020年7月14日 10:42 收件人: user-zh <[hidden email]> 主题: Re: Flink SQL处理Array型的JSON Hello,可爱的木兰 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1] SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html> > 在 2020年7月14日,10:34,hua mulan <[hidden email]> 写道: > > 可爱的木兰 |
In reply to this post by Leonard Xu
我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1]. [1] https://issues.apache.org/jira/browse/FLINK-18590 Leonard Xu <[hidden email]> 于2020年7月14日周二 上午10:42写道: > Hello,可爱的木兰 > > 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1] > > SELECT users, tag > FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) > > Best, > Leonard Xu > [1] > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html > < > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html > > > > > 在 2020年7月14日,10:34,hua mulan <[hidden email]> 写道: > > > > 可爱的木兰 > > -- Best, Benchao Li |
Hi
那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 在导入Kafka,之后再FlinkSQL 处理。 可爱的木兰 ________________________________ 发件人: Benchao Li <[hidden email]> 发送时间: 2020年7月14日 11:00 收件人: user-zh <[hidden email]> 主题: Re: Flink SQL处理Array型的JSON 我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了, 我建了一个issue[1]. [1] https://issues.apache.org/jira/browse/FLINK-18590 Leonard Xu <[hidden email]> 于2020年7月14日周二 上午10:42写道: > Hello,可爱的木兰 > > 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1] > > SELECT users, tag > FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) > > Best, > Leonard Xu > [1] > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html > < > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html > > > > > 在 2020年7月14日,10:34,hua mulan <[hidden email]> 写道: > > > > 可爱的木兰 > > -- Best, Benchao Li |
如果不等待最新版本的话也可以这样
将 纯数组的数据作为字符串 从source消费,增加自定义的json解析函数,判断 isArray 之后 遍历进行 collect if (Objects.nonNull(str)) { if (isArray) { JsonNode node = objectMapper.readTree(str); if (node.isArray()) { Iterator<JsonNode> nodeIterator = node.elements(); while (nodeIterator.hasNext()) { collect(deserializationSchema.deserialize(nodeIterator.next().toString().getBytes())); } } } else { collect(deserializationSchema.deserialize(str.getBytes())); } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
补充:
最终查询为 SELECT t.* FROM kafka_source, LATERAL TABLE( fromJson(data) ) as t -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |