hi,大家好,我遇到一个问题。
下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
"age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?
CREATE TABLE mykafka1 (name String, age Int)
WITH (
'connector.type' = 'kafka',
'format.type' = 'json',
'update-mode' = 'append'
);
还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream<List<data>>,再转换flatMap转换成DataStream<data>,再使用tableEnv.fromDataStream把它变成tableSource?
--
Sent from:
http://apache-flink.147419.n8.nabble.com/