|
Hi:
我Flink消费kafka的数据,我创建一张表如下:
val kafkaSource =
"""
|create table kafka_order(
|order_id string,
|order_price decimal(10,2),
|order_time timestamp(3)
|)
|with(
|'connector' = 'kafka',
|'topic' = 'iceberg.order',
|'properties.bootstrap.servers' = 'hostname:9092',
|'format' = 'json',
|'properties.group.id' = 'data-lake',
|'scan.startup.mode' = 'earliest-offset',
|'json.ignore-parse-errors' = 'false'
|)
|""".stripMargin
tEnv.executeSql(kafkaSource)
我直接查询后print到控制台时,没法消费成功,如下:
val query =
"""
|select * from kafka_order
|""".stripMargin
tEnv.executeSql(query).print()
但是我创建一个print的connect,然后insert into 表 select * from kafka_order这样是可以正常消费的,如下:
val print =
"""
|create table p_order(
|order_id string,
|order_price decimal(10,2),
|order_time timestamp(3)
|)
|with(
|'connector' = 'print'
|)
|""".stripMargin
tEnv.executeSql(print)
val query =
"""
|insert into p_order
|select * from kafka_order
|""".stripMargin
tEnv.executeSql(query)
这具体是为什么呢?望知道的大佬告知一下,感激不尽。
祝好!
|