各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings source=""" CREATE TABLE kafka_source_tab ( id VARCHAR, alarm_id VARCHAR, trck_id VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'alarm_test_g', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '10.2.2.73:2181', 'properties.bootstrap.servers' = '10.2.2.73:9092', 'format' = 'json' ) """ sink=""" CREATE TABLE g_source_tab ( id VARCHAR, alarm_id VARCHAR, trck_id VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 'table-name' = 'g', 'username' = 'root', 'password' = '123456t', 'sink.buffer-flush.interval' = '1s' ) """ env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.set_parallelism(1) env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) t_env.execute_sql(source) t_env.execute_sql(sink) source = t_env.from_path("kafka_source_tab")\ .select("id,alarm_id,trck_id") source.execute_insert("g_source_tab") |
Administrator
|
请问你是怎么提交的作业呢? 是在本地 IDEA 里面执行的,还是打成 jar 包后提交到集群运行的呢?
On Mon, 13 Jul 2020 at 17:58, 小学霸 <[hidden email]> wrote: > 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。 > from pyflink.datastream import StreamExecutionEnvironment, > TimeCharacteristic, CheckpointingMode > from pyflink.table import StreamTableEnvironment, EnvironmentSettings > source=""" > CREATE TABLE kafka_source_tab ( > id VARCHAR, > alarm_id VARCHAR, > trck_id VARCHAR > > > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'alarm_test_g', > 'scan.startup.mode' = 'earliest-offset', > 'properties.bootstrap.servers' = '10.2.2.73:2181', > 'properties.bootstrap.servers' = '10.2.2.73:9092', > 'format' = 'json' > ) > """ > > sink=""" > CREATE TABLE g_source_tab ( > id VARCHAR, > alarm_id VARCHAR, > trck_id VARCHAR > > > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', > 'table-name' = 'g', > 'username' = 'root', > 'password' = '123456t', > 'sink.buffer-flush.interval' = '1s' > ) > """ > env = StreamExecutionEnvironment.get_execution_environment() > env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > env.set_parallelism(1) > env_settings = > EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() > t_env = StreamTableEnvironment.create(env, > environment_settings=env_settings) > > > > t_env.execute_sql(source) > t_env.execute_sql(sink) > > > source = t_env.from_path("kafka_source_tab")\ > .select("id,alarm_id,trck_id") > source.execute_insert("g_source_tab") |
Free forum by Nabble | Edit this page |