pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

徐振华
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

# pyflink 1.11.1
environment_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
senv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(senv, environment_settings=environment_settings)
source_ddl="create table sc(wd varchar,cnt int,c int,hu varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')"
sink_ddl = "create table print_sink(wd varchar,cnt bigint)with('connector'='print')"

# 以下可以正常执行
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc group by wd")
t_env.execute("soc")

# 以下不可以正常执行
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd")
t_env.execute("soc")


# 也不可以正常执行
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd")
senv.execute("soc")

  


 
Reply | Threaded
Open this post in threaded view
|

Re: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

Xingbo Huang
Hi,
execute_sql已经包含有了execute的意思了无非前者是异步非阻塞的,所以,你就别execute_sql之后还来一个execute了,具体你可以看下文档[1]。如果你使用execute_sql且在本地跑的话,你需要进行如下操作,否则会程序直接跑完没有结果。
result = t_env.execute_sql("你写的sql")
result.get_job_client().get_job_execution_result().result()

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query

Best,
Xingbo

徐振华 <[hidden email]> 于2020年8月12日周三 下午4:00写道:

> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>
> # pyflink 1.11.1
> environment_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> senv = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(senv,
> environment_settings=environment_settings)
> source_ddl="create table sc(wd varchar,cnt int,c int,hu
> varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')"
> sink_ddl = "create table print_sink(wd varchar,cnt
> bigint)with('connector'='print')"
>
> # 以下可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc
> group by wd")
> t_env.execute("soc")
>
> # 以下不可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc
> group by wd")
> t_env.execute("soc")
>
>
> # 也不可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc
> group by wd")
> senv.execute("soc")
>
> &nbsp;&nbsp;
>
>
> &nbsp;
Reply | Threaded
Open this post in threaded view
|

Re: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

Leonard Xu
In reply to this post by 徐振华
Hi
现象是正常的,
execute_sql是一个异步的方法,提交后就退出了,如果需要等待执行结果,可以调用如下方法显式地等待
sql_result = t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd")
sql_result.get_job_client().get_job_execution_result().result()

祝好
Leonard Xu

> 在 2020年8月12日,16:00,徐振华 <[hidden email]> 写道:
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>
> # pyflink 1.11.1
> environment_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> senv = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(senv, environment_settings=environment_settings)
> source_ddl="create table sc(wd varchar,cnt int,c int,hu varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')"
> sink_ddl = "create table print_sink(wd varchar,cnt bigint)with('connector'='print')"
>
> # 以下可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc group by wd")
> t_env.execute("soc")
>
> # 以下不可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd")
> t_env.execute("soc")
>
>
> # 也不可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd")
> senv.execute("soc")
>
> &nbsp;&nbsp;
>
>
> &nbsp;

Reply | Threaded
Open this post in threaded view
|

回复: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

徐振华
In reply to this post by Xingbo Huang
谢谢,看文档看错了。

&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月12日(星期三) 下午4:11
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated



Hi,
execute_sql已经包含有了execute的意思了无非前者是异步非阻塞的,所以,你就别execute_sql之后还来一个execute了,具体你可以看下文档[1]。如果你使用execute_sql且在本地跑的话,你需要进行如下操作,否则会程序直接跑完没有结果。
result = t_env.execute_sql("你写的sql")
result.get_job_client().get_job_execution_result().result()

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query

Best,
Xingbo

徐振华 <[hidden email]&gt; 于2020年8月12日周三 下午4:00写道:

&gt; from pyflink.datastream import StreamExecutionEnvironment
&gt; from pyflink.table import EnvironmentSettings, StreamTableEnvironment
&gt;
&gt; # pyflink 1.11.1
&gt; environment_settings =
&gt; EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
&gt; senv = StreamExecutionEnvironment.get_execution_environment()
&gt; t_env = StreamTableEnvironment.create(senv,
&gt; environment_settings=environment_settings)
&gt; source_ddl="create table sc(wd varchar,cnt int,c int,hu
&gt; varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')"
&gt; sink_ddl = "create table print_sink(wd varchar,cnt
&gt; bigint)with('connector'='print')"
&gt;
&gt; # 以下可以正常执行
&gt; t_env.execute_sql(source_ddl)
&gt; t_env.execute_sql(sink_ddl)
&gt; t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc
&gt; group by wd")
&gt; t_env.execute("soc")
&gt;
&gt; # 以下不可以正常执行
&gt; t_env.execute_sql(source_ddl)
&gt; t_env.execute_sql(sink_ddl)
&gt; t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc
&gt; group by wd")
&gt; t_env.execute("soc")
&gt;
&gt;
&gt; # 也不可以正常执行
&gt; t_env.execute_sql(source_ddl)
&gt; t_env.execute_sql(sink_ddl)
&gt; t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc
&gt; group by wd")
&gt; senv.execute("soc")
&gt;
&gt; &amp;nbsp;&amp;nbsp;
&gt;
&gt;
&gt; &amp;nbsp;