tEnv.executeSql(query).print() 这样不能成功消费kafka的数据

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

tEnv.executeSql(query).print() 这样不能成功消费kafka的数据

jy l
Hi:
我Flink消费kafka的数据,我创建一张表如下:

val kafkaSource =
      """
        |create table kafka_order(
        |order_id string,
        |order_price decimal(10,2),
        |order_time timestamp(3)
        |)
        |with(
        |'connector' = 'kafka',
        |'topic' = 'iceberg.order',
        |'properties.bootstrap.servers' = 'hostname:9092',
        |'format' = 'json',
        |'properties.group.id' = 'data-lake',
        |'scan.startup.mode' = 'earliest-offset',
        |'json.ignore-parse-errors' = 'false'
        |)
        |""".stripMargin
    tEnv.executeSql(kafkaSource)

我直接查询后print到控制台时,没法消费成功,如下:
 val query =
      """
        |select * from kafka_order
        |""".stripMargin
    tEnv.executeSql(query).print()

但是我创建一个print的connect,然后insert into 表 select * from kafka_order这样是可以正常消费的,如下:
val print =
      """
        |create table p_order(
        |order_id string,
        |order_price decimal(10,2),
        |order_time timestamp(3)
        |)
        |with(
        |'connector' = 'print'
        |)
        |""".stripMargin
    tEnv.executeSql(print)
    val query =
      """
        |insert into p_order
        |select * from kafka_order
        |""".stripMargin
    tEnv.executeSql(query)

这具体是为什么呢?望知道的大佬告知一下,感激不尽。

祝好!