flink table同时作为写出及输入时下游无数据

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink table同时作为写出及输入时下游无数据

咿咿呀呀
各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。




from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings



env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



kafka_source_ddl = """
CREATE TABLE kafka_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,   
 trck_id VARCHAR


) WITH (
 'connector' = 'kafka',
 'topic' = 'gg',   
 'scan.startup.mode' = 'specific-offsets', 
 'scan.startup.specific-offsets'='partition:1,offset:0',
 'properties.bootstrap.servers' = '****',
 'format' = 'json'
)
"""
g_unit_sink_ddl = """
CREATE TABLE g_sink_unit (
 alarm_id VARCHAR,   
 trck_id VARCHAR
 
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
 'table-name' = 'g_unit',   
 'username' = 'root',
 'password' = 'root',
 'sink.buffer-flush.interval' = '1s'     
)
"""
g_summary_ddl = """
CREATE TABLE g_summary_base(
 alarm_id VARCHAR,   
 trck_id VARCHAR
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
 'table-name' = 'g_summary', 
 'username' = 'root',
 'password' = 'root',
 'sink.buffer-flush.interval' = '1s'
)
"""

t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(g_unit_sink_ddl)
t_env.execute_sql(g_summary_ddl)


sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab'''
sql2='''Insert into g_summary_base select alarm_id,trck_id from g_sink_unit '''



stmt_set = t_env.create_statement_set()
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)


stmt_set.execute().get_job_client().get_job_execution_result().result()
Reply | Threaded
Open this post in threaded view
|

Re: flink table同时作为写出及输入时下游无数据

godfrey he
你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。

另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:

sql1='''Insert into g_sink_unit select alarm_id,trck_id from
kafka_source_tab'''
sql2='''Insert into g_summary_base select alarm_id,trck_id from
kafka_source_tab;'''

小学生 <[hidden email]> 于2020年7月21日周二 下午5:47写道:

>
> 各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。
>
>
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>
>
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env,
> environment_settings=env_settings)
>
>
>
> kafka_source_ddl = """
> CREATE TABLE kafka_source_tab (
> &nbsp;id VARCHAR,&nbsp; &nbsp;
> &nbsp;alarm_id VARCHAR,&nbsp; &nbsp;
> &nbsp;trck_id VARCHAR
>
>
> ) WITH (
> &nbsp;'connector' = 'kafka',
> &nbsp;'topic' = 'gg',&nbsp; &nbsp;
> &nbsp;'scan.startup.mode' = 'specific-offsets',&nbsp;
> &nbsp;'scan.startup.specific-offsets'='partition:1,offset:0',
> &nbsp;'properties.bootstrap.servers' = '****',
> &nbsp;'format' = 'json'
> )
> """
> g_unit_sink_ddl = """
> CREATE TABLE g_sink_unit (
> &nbsp;alarm_id VARCHAR,&nbsp; &nbsp;
> &nbsp;trck_id VARCHAR
> &nbsp;
> ) WITH (
> &nbsp;'connector' = 'jdbc',
> &nbsp;'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
> &nbsp;'table-name' = 'g_unit',&nbsp; &nbsp;
> &nbsp;'username' = 'root',
> &nbsp;'password' = 'root',
> &nbsp;'sink.buffer-flush.interval' = '1s'&nbsp; &nbsp; &nbsp;
> )
> """
> g_summary_ddl = """
> CREATE TABLE g_summary_base(
> &nbsp;alarm_id VARCHAR,&nbsp; &nbsp;
> &nbsp;trck_id VARCHAR
> ) WITH (
> &nbsp;'connector' = 'jdbc',
> &nbsp;'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
> &nbsp;'table-name' = 'g_summary',&nbsp;
> &nbsp;'username' = 'root',
> &nbsp;'password' = 'root',
> &nbsp;'sink.buffer-flush.interval' = '1s'
> )
> """
>
> t_env.execute_sql(kafka_source_ddl)
> t_env.execute_sql(g_unit_sink_ddl)
> t_env.execute_sql(g_summary_ddl)
>
>
> sql1='''Insert into g_sink_unit&nbsp;select alarm_id,trck_id from
> kafka_source_tab'''
> sql2='''Insert into g_summary_base&nbsp;select alarm_id,trck_id from
> g_sink_unit&nbsp;'''
>
>
>
> stmt_set = t_env.create_statement_set()
> stmt_set.add_insert_sql(sql1)
> stmt_set.add_insert_sql(sql2)
>
>
> stmt_set.execute().get_job_client().get_job_execution_result().result()
Reply | Threaded
Open this post in threaded view
|

Re: flink table同时作为写出及输入时下游无数据

咿咿呀呀
In reply to this post by 咿咿呀呀
就是没有数据,我这个是简化版本的,都切换为kafka的初始源是没问题的



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink table同时作为写出及输入时下游无数据

咿咿呀呀
In reply to this post by godfrey he
1.你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据,这个是有数据的,
2.这个例子里当然都写完kafka的初始源是完全没问题的,实际中确实是需要g_unit作为桥梁的。