python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
代码如下,使用了MATCH_RECOGNIZE: s_env = StreamExecutionEnvironment.get_execution_environment() b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() st_env = StreamTableEnvironment.create(s_env, environment_settings=b_s_settings) configuration = st_env.get_config().get_configuration() configuration.set_string("taskmanager.memory.task.off-heap.size", "500m") s_env.set_parallelism(1) kafka_source = """CREATE TABLE source ( flow_name STRING, flow_id STRING, component STRING, filename STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'cep', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' )""" postgres_sink = """ CREATE TABLE cep_result ( `filename` STRING, `start_tstamp` TIMESTAMP(3), `end_tstamp` TIMESTAMP(3) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres', 'connector.table' = 'cep_result', 'connector.driver' = 'org.postgresql.Driver', 'connector.username' = 'postgres', 'connector.password' = 'my_password', 'connector.write.flush.max-rows' = '1' ) """ st_env.sql_update(kafka_source) st_env.sql_update(postgres_sink) postgres_sink_sql = ''' INSERT INTO cep_result SELECT * FROM source MATCH_RECOGNIZE ( PARTITION BY filename ORDER BY event_time MEASURES (A.event_time) AS start_tstamp, (D.event_time) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A B C D) DEFINE A AS component = 'XXX', B AS component = 'YYY', C AS component = 'ZZZ', D AS component = 'WWW' ) MR ''' sql_result = st_env.execute_sql(postgres_sink_sql) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
execute_sql是一个异步非阻塞的方法,所以你需要在你的代码末尾加上 sql_result.get_job_client().get_job_execution_result().result() 对此我已经创建了JIRA[1] [1] https://issues.apache.org/jira/browse/FLINK-18598 Best, Xingbo lgs <[hidden email]> 于2020年7月21日周二 下午3:35写道: > python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。 > 代码如下,使用了MATCH_RECOGNIZE: > > s_env = StreamExecutionEnvironment.get_execution_environment() > b_s_settings = > > EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() > st_env = StreamTableEnvironment.create(s_env, > environment_settings=b_s_settings) > configuration = st_env.get_config().get_configuration() > configuration.set_string("taskmanager.memory.task.off-heap.size", > "500m") > > s_env.set_parallelism(1) > > kafka_source = """CREATE TABLE source ( > flow_name STRING, > flow_id STRING, > component STRING, > filename STRING, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'cep', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > )""" > > > > postgres_sink = """ > CREATE TABLE cep_result ( > `filename` STRING, > `start_tstamp` TIMESTAMP(3), > `end_tstamp` TIMESTAMP(3) > ) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres', > 'connector.table' = 'cep_result', > 'connector.driver' = 'org.postgresql.Driver', > 'connector.username' = 'postgres', > 'connector.password' = 'my_password', > 'connector.write.flush.max-rows' = '1' > ) > """ > > st_env.sql_update(kafka_source) > st_env.sql_update(postgres_sink) > > postgres_sink_sql = ''' > INSERT INTO cep_result > SELECT * > FROM source > MATCH_RECOGNIZE ( > PARTITION BY filename > ORDER BY event_time > MEASURES > (A.event_time) AS start_tstamp, > (D.event_time) AS end_tstamp > ONE ROW PER MATCH > AFTER MATCH SKIP PAST LAST ROW > PATTERN (A B C D) > DEFINE > A AS component = 'XXX', > B AS component = 'YYY', > C AS component = 'ZZZ', > D AS component = 'WWW' > ) MR > ''' > > sql_result = st_env.execute_sql(postgres_sink_sql) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
谢谢。加上后就可以了。
改成原来的sql_update然后st_env.execute("job")好像也可以。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
是的,execute是1.10及以前使用的,execute_sql是1.11之后推荐使用的
Best, Xingbo lgs <[hidden email]> 于2020年7月21日周二 下午3:57写道: > 谢谢。加上后就可以了。 > > 改成原来的sql_update然后st_env.execute("job")好像也可以。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |