from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment # pyflink 1.11.1 environment_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() senv = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(senv, environment_settings=environment_settings) source_ddl="create table sc(wd varchar,cnt int,c int,hu varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')" sink_ddl = "create table print_sink(wd varchar,cnt bigint)with('connector'='print')" # 以下可以正常执行 t_env.execute_sql(source_ddl) t_env.execute_sql(sink_ddl) t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc group by wd") t_env.execute("soc") # 以下不可以正常执行 t_env.execute_sql(source_ddl) t_env.execute_sql(sink_ddl) t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd") t_env.execute("soc") # 也不可以正常执行 t_env.execute_sql(source_ddl) t_env.execute_sql(sink_ddl) t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd") senv.execute("soc") |
Hi,
execute_sql已经包含有了execute的意思了无非前者是异步非阻塞的,所以,你就别execute_sql之后还来一个execute了,具体你可以看下文档[1]。如果你使用execute_sql且在本地跑的话,你需要进行如下操作,否则会程序直接跑完没有结果。 result = t_env.execute_sql("你写的sql") result.get_job_client().get_job_execution_result().result() [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query Best, Xingbo 徐振华 <[hidden email]> 于2020年8月12日周三 下午4:00写道: > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > # pyflink 1.11.1 > environment_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > senv = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(senv, > environment_settings=environment_settings) > source_ddl="create table sc(wd varchar,cnt int,c int,hu > varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')" > sink_ddl = "create table print_sink(wd varchar,cnt > bigint)with('connector'='print')" > > # 以下可以正常执行 > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc > group by wd") > t_env.execute("soc") > > # 以下不可以正常执行 > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc > group by wd") > t_env.execute("soc") > > > # 也不可以正常执行 > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc > group by wd") > senv.execute("soc") > > > > > |
In reply to this post by 徐振华
Hi
现象是正常的, execute_sql是一个异步的方法,提交后就退出了,如果需要等待执行结果,可以调用如下方法显式地等待 sql_result = t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd") sql_result.get_job_client().get_job_execution_result().result() 祝好 Leonard Xu > 在 2020年8月12日,16:00,徐振华 <[hidden email]> 写道: > > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > # pyflink 1.11.1 > environment_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > senv = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(senv, environment_settings=environment_settings) > source_ddl="create table sc(wd varchar,cnt int,c int,hu varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')" > sink_ddl = "create table print_sink(wd varchar,cnt bigint)with('connector'='print')" > > # 以下可以正常执行 > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc group by wd") > t_env.execute("soc") > > # 以下不可以正常执行 > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd") > t_env.execute("soc") > > > # 也不可以正常执行 > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd") > senv.execute("soc") > > > > > |
In reply to this post by Xingbo Huang
谢谢,看文档看错了。
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月12日(星期三) 下午4:11 收件人: "user-zh"<[hidden email]>; 主题: Re: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated Hi, execute_sql已经包含有了execute的意思了无非前者是异步非阻塞的,所以,你就别execute_sql之后还来一个execute了,具体你可以看下文档[1]。如果你使用execute_sql且在本地跑的话,你需要进行如下操作,否则会程序直接跑完没有结果。 result = t_env.execute_sql("你写的sql") result.get_job_client().get_job_execution_result().result() [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query Best, Xingbo 徐振华 <[hidden email]> 于2020年8月12日周三 下午4:00写道: > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > # pyflink 1.11.1 > environment_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > senv = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(senv, > environment_settings=environment_settings) > source_ddl="create table sc(wd varchar,cnt int,c int,hu > varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')" > sink_ddl = "create table print_sink(wd varchar,cnt > bigint)with('connector'='print')" > > # 以下可以正常执行 > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc > group by wd") > t_env.execute("soc") > > # 以下不可以正常执行 > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc > group by wd") > t_env.execute("soc") > > > # 也不可以正常执行 > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc > group by wd") > senv.execute("soc") > > &nbsp;&nbsp; > > > &nbsp; |
Free forum by Nabble | Edit this page |