how flink-sql connector kafka reads array json

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

how flink-sql connector kafka reads array json

大罗
hi,大家好,我遇到一个问题。

下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
"age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?

CREATE TABLE mykafka1 (name String, age Int)
WITH (
   'connector.type' = 'kafka',
   'format.type' = 'json',
   'update-mode' = 'append'
);

还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream<List&lt;data>>,再转换flatMap转换成DataStream<data>,再使用tableEnv.fromDataStream把它变成tableSource?




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: how flink-sql connector kafka reads array json

Benchao Li-2
Hi,

这个是一个已知的问题,已经有issue[1] 在跟进解决了。预计在1.12可以使用。

[1] https://issues.apache.org/jira/browse/FLINK-18590

大罗 <[hidden email]> 于2020年9月8日周二 上午10:39写道:

> hi,大家好,我遇到一个问题。
>
> 下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
> "age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?
>
> CREATE TABLE mykafka1 (name String, age Int)
> WITH (
>    'connector.type' = 'kafka',
>    'format.type' = 'json',
>    'update-mode' = 'append'
> );
>
>
> 还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream<List&lt;data>>,再转换flatMap转换成DataStream<data>,再使用tableEnv.fromDataStream把它变成tableSource?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--

Best,
Benchao Li