如何讓兩個 SQL 使用相同的 KafkaTableSource

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

如何讓兩個 SQL 使用相同的 KafkaTableSource

Tony Wei
Hi

我在我的 flink job 中透過 `flinkTableEnv.connect(new
Kafka()...).registerTableSource(...)` 註冊了
一張 kafka table。但從文件上我才知道 SQL 只會在特定的條件下才會真正的轉為 DataStream,比
如說呼叫了Table#toRetractStream`。

因為如此,我發現當我嘗試在同一個 flink job 中使用了不同的 SQL 時,他們會同時產生各自的
kafka source operator。從 flink 的角度來說可能不是什麼大問題,各自獨立的 operator 會各自管理
好自己的 offset state,也不會互相影響。但是從 kafka 方面來看,因為兩邊都是使用相同的
group_id,當 offset 被 commit 回 kafka 時,就會在 kafka 端有衝突。

我想要確保每個 group_id 只會被一個 operator 負責執行 commit 的動作。最簡單的做法可能是故意
為相同的 kafka topic 註冊兩個名稱不同的 table, group_id,分別給兩個 SQL 使用。但我想知道是
不是有更好的做法,可以讓兩個 SQL 是真正的從同一個 kafka operator 讀取資料?這樣也不需要同
時存在兩個做一樣事情的 kafka operator 。先謝謝各位的幫助。

Best,
Tony Wei