flink 1.11写入mysql问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11写入mysql问题

咿咿呀呀
各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,   
 trck_id VARCHAR


) WITH (
 'connector' = 'kafka',
 'topic' = 'alarm_test_g',   
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = '10.2.2.73:2181',
 'properties.bootstrap.servers' = '10.2.2.73:9092',
 'format' = 'json' 
)
"""

sink="""
CREATE TABLE g_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,     
 trck_id VARCHAR


) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
 'table-name' = 'g',   
 'username' = 'root',
 'password' = '123456t',
 'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
        .select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")