各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.set_parallelism(1) env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) kafka_source_ddl = """ CREATE TABLE kafka_source_tab ( id VARCHAR, alarm_id VARCHAR, trck_id VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'gg', 'scan.startup.mode' = 'specific-offsets', 'scan.startup.specific-offsets'='partition:1,offset:0', 'properties.bootstrap.servers' = '****', 'format' = 'json' ) """ g_unit_sink_ddl = """ CREATE TABLE g_sink_unit ( alarm_id VARCHAR, trck_id VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false', 'table-name' = 'g_unit', 'username' = 'root', 'password' = 'root', 'sink.buffer-flush.interval' = '1s' ) """ g_summary_ddl = """ CREATE TABLE g_summary_base( alarm_id VARCHAR, trck_id VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false', 'table-name' = 'g_summary', 'username' = 'root', 'password' = 'root', 'sink.buffer-flush.interval' = '1s' ) """ t_env.execute_sql(kafka_source_ddl) t_env.execute_sql(g_unit_sink_ddl) t_env.execute_sql(g_summary_ddl) sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_base select alarm_id,trck_id from g_sink_unit ''' stmt_set = t_env.create_statement_set() stmt_set.add_insert_sql(sql1) stmt_set.add_insert_sql(sql2) stmt_set.execute().get_job_client().get_job_execution_result().result() |
你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。
另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink: sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_base select alarm_id,trck_id from kafka_source_tab;''' 小学生 <[hidden email]> 于2020年7月21日周二 下午5:47写道: > > 各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。 > > > > > from pyflink.datastream import StreamExecutionEnvironment, > TimeCharacteristic, CheckpointingMode > from pyflink.table import StreamTableEnvironment, EnvironmentSettings > > > > env = StreamExecutionEnvironment.get_execution_environment() > env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > env.set_parallelism(1) > env_settings = > EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() > t_env = StreamTableEnvironment.create(env, > environment_settings=env_settings) > > > > kafka_source_ddl = """ > CREATE TABLE kafka_source_tab ( > id VARCHAR, > alarm_id VARCHAR, > trck_id VARCHAR > > > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'gg', > 'scan.startup.mode' = 'specific-offsets', > 'scan.startup.specific-offsets'='partition:1,offset:0', > 'properties.bootstrap.servers' = '****', > 'format' = 'json' > ) > """ > g_unit_sink_ddl = """ > CREATE TABLE g_sink_unit ( > alarm_id VARCHAR, > trck_id VARCHAR > > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false', > 'table-name' = 'g_unit', > 'username' = 'root', > 'password' = 'root', > 'sink.buffer-flush.interval' = '1s' > ) > """ > g_summary_ddl = """ > CREATE TABLE g_summary_base( > alarm_id VARCHAR, > trck_id VARCHAR > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false', > 'table-name' = 'g_summary', > 'username' = 'root', > 'password' = 'root', > 'sink.buffer-flush.interval' = '1s' > ) > """ > > t_env.execute_sql(kafka_source_ddl) > t_env.execute_sql(g_unit_sink_ddl) > t_env.execute_sql(g_summary_ddl) > > > sql1='''Insert into g_sink_unit select alarm_id,trck_id from > kafka_source_tab''' > sql2='''Insert into g_summary_base select alarm_id,trck_id from > g_sink_unit ''' > > > > stmt_set = t_env.create_statement_set() > stmt_set.add_insert_sql(sql1) > stmt_set.add_insert_sql(sql2) > > > stmt_set.execute().get_job_client().get_job_execution_result().result() |
In reply to this post by 咿咿呀呀
|
Free forum by Nabble | Edit this page |