Flink读取kafka没有报错也没有数据输出,Kafka消费端有数据,谢谢

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink读取kafka没有报错也没有数据输出,Kafka消费端有数据,谢谢

Appleyuchi
大佬们好
我的环境是:


| 组件 | 版本 |
|
 Flink
|
1.12
|
|
Kafka
|
2.5.0
|
|
Zookeeper
|
3.6.0
|




完整代码是
https://paste.ubuntu.com/p/pRWpvJw4b8/
kafka消费端(使用命令行消费)确认有数据输出。
但是上述代码没有输出,DDL检查过确认无误。


因为听说executeSql会提交任务,所以把最后一句execute给注销了。
求问应该如何修改代码才能让代码有输出?
谢谢
Reply | Threaded
Open this post in threaded view
|

Re: Flink读取kafka没有报错也没有数据输出,Kafka消费端有数据,谢谢

Leonard Xu
Hi,

> 因为听说executeSql会提交任务,所以把最后一句execute给注销了。


> val result: Table = tEnv.sqlQuery(query)
> tEnv.toRetractStream[Row](result).print()
> //    tEnv.execute("Flink SQL DDL")

DataStream程序的执行和Table/SQL程序的执行是解耦的,已经通过 tEnv.toRetractStream 转成 DataStrean 的程序后,需要调用 bsEnv.execute("test")

如果需要直接用SQL,可以直接:
tEnv.executeSql(query).print();

转换成datastream后应该类似这样
val result: Table = tEnv.sqlQuery(query)
tEnv.toRetractStream[Row](result).print()
bsEnv.execute("test”)

 

祝好,
Leonard

Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink读取kafka没有报错也没有数据输出,Kafka消费端有数据,谢谢

Appleyuchi
解决了,谢谢

















在 2020-12-25 14:48:57,"Leonard Xu" <[hidden email]> 写道:

>Hi,
>
>> 因为听说executeSql会提交任务,所以把最后一句execute给注销了。
>
>
>> val result: Table = tEnv.sqlQuery(query)
>> tEnv.toRetractStream[Row](result).print()
>> //    tEnv.execute("Flink SQL DDL")
>
>DataStream程序的执行和Table/SQL程序的执行是解耦的,已经通过 tEnv.toRetractStream 转成 DataStrean 的程序后,需要调用 bsEnv.execute("test")
>
>如果需要直接用SQL,可以直接:
>tEnv.executeSql(query).print();
>
>转换成datastream后应该类似这样
>val result: Table = tEnv.sqlQuery(query)
>tEnv.toRetractStream[Row](result).print()
>bsEnv.execute("test”)
>
>
>
>祝好,
>Leonard