关于statement输出结果疑问

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

关于statement输出结果疑问

刘杰鸿
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""
    CREATE TABLE first_sink_table (
        id BIGINT,
        data VARCHAR
    ) WITH (
        'connector' = 'print'
    )
""")
table_env.execute_sql("""
    CREATE TABLE second_sink_table (
        id BIGINT,
        data VARCHAR
    ) WITH (
        'connector' = 'print'
    )
""")
# 创建一个statement对象
statement_set = table_env.create_statement_set()
# 使用TABLE API 将table表插进first_sink_table表里面
statement_set.add_insert("first_sink_table", table)
# 使用SQL将table表插进second_sink_table表里面
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
# 执行查询
statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> +I(1,Hi)
4> +I(1,Hi)
4> +I(2,Hello)
4> +I(2,Hello)
Reply | Threaded
Open this post in threaded view
|

Re: 关于statement输出结果疑问

Dian Fu
可以说一下为什么你觉得输出结果应该是1,2,1,2吗?

个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2

On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email]> wrote:

> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env =
> StreamTableEnvironment.create(environment_settings=env_settings)
>
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
> table_env.create_temporary_view("simple_source", table)
> table_env.execute_sql("""
>     CREATE TABLE first_sink_table (
>         id BIGINT,
>         data VARCHAR
>     ) WITH (
>         'connector' = 'print'
>     )
> """)
> table_env.execute_sql("""
>     CREATE TABLE second_sink_table (
>         id BIGINT,
>         data VARCHAR
>     ) WITH (
>         'connector' = 'print'
>     )
> """)
> # 创建一个statement对象
> statement_set = table_env.create_statement_set()
> # 使用TABLE API 将table表插进first_sink_table表里面
> statement_set.add_insert("first_sink_table", table)
> # 使用SQL将table表插进second_sink_table表里面
> statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM
> simple_source")
> # 执行查询
> statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4&gt;
> +I(1,Hi)
> 4&gt; +I(1,Hi)
> 4&gt; +I(2,Hello)
> 4&gt; +I(2,Hello)
Reply | Threaded
Open this post in threaded view
|

回复: 关于statement输出结果疑问

刘杰鸿
我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。
所以输出应该是1,2,1,2



------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "Dian Fu"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年3月12日(星期五) 晚上10:24
收件人:&nbsp;"user-zh"<[hidden email]&gt;;"刘杰鸿"<[hidden email]&gt;;

主题:&nbsp;Re: 关于statement输出结果疑问



可以说一下为什么你觉得输出结果应该是1,2,1,2吗?

个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2

On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email]&gt; wrote:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
 
 env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
 table_env = StreamTableEnvironment.create(environment_settings=env_settings)
 
 table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
 table_env.create_temporary_view("simple_source", table)
 table_env.execute_sql("""
 &nbsp; &nbsp; CREATE TABLE first_sink_table (
 &nbsp; &nbsp; &nbsp; &nbsp; id BIGINT,
 &nbsp; &nbsp; &nbsp; &nbsp; data VARCHAR
 &nbsp; &nbsp; ) WITH (
 &nbsp; &nbsp; &nbsp; &nbsp; 'connector' = 'print'
 &nbsp; &nbsp; )
 """)
 table_env.execute_sql("""
 &nbsp; &nbsp; CREATE TABLE second_sink_table (
 &nbsp; &nbsp; &nbsp; &nbsp; id BIGINT,
 &nbsp; &nbsp; &nbsp; &nbsp; data VARCHAR
 &nbsp; &nbsp; ) WITH (
 &nbsp; &nbsp; &nbsp; &nbsp; 'connector' = 'print'
 &nbsp; &nbsp; )
 """)
 # 创建一个statement对象
 statement_set = table_env.create_statement_set()
 # 使用TABLE API 将table表插进first_sink_table表里面
 statement_set.add_insert("first_sink_table", table)
 # 使用SQL将table表插进second_sink_table表里面
 statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
 # 执行查询
 statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4&amp;gt; +I(1,Hi)
 4&amp;gt; +I(1,Hi)
 4&amp;gt; +I(2,Hello)
 4&amp;gt; +I(2,Hello)
Reply | Threaded
Open this post in threaded view
|

Re: 关于statement输出结果疑问

Dian Fu
奥,那你理解错了。这里面其实细分成2种情况:
- sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据
- sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。

但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。

> 2021年3月12日 下午10:52,刘杰鸿 <[hidden email]> 写道:
>
> 我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。
> 所以输出应该是1,2,1,2
>
> ------------------ 原始邮件 ------------------
> 发件人: "Dian Fu" <[hidden email]>;
> 发送时间: 2021年3月12日(星期五) 晚上10:24
> 收件人: "user-zh"<[hidden email]>;"刘杰鸿"<[hidden email]>;
> 主题: Re: 关于statement输出结果疑问
>
> 可以说一下为什么你觉得输出结果应该是1,2,1,2吗?
>
> 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2
>
> On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email] <mailto:[hidden email]>> wrote:
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>
> env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
>
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
> table_env.create_temporary_view("simple_source", table)
> table_env.execute_sql("""
>     CREATE TABLE first_sink_table (
>         id BIGINT,
>         data VARCHAR
>     ) WITH (
>         'connector' = 'print'
>     )
> """)
> table_env.execute_sql("""
>     CREATE TABLE second_sink_table (
>         id BIGINT,
>         data VARCHAR
>     ) WITH (
>         'connector' = 'print'
>     )
> """)
> # 创建一个statement对象
> statement_set = table_env.create_statement_set()
> # 使用TABLE API 将table表插进first_sink_table表里面
> statement_set.add_insert("first_sink_table", table)
> # 使用SQL将table表插进second_sink_table表里面
> statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
> # 执行查询
> statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4&gt; +I(1,Hi)
> 4&gt; +I(1,Hi)
> 4&gt; +I(2,Hello)
> 4&gt; +I(2,Hello)

Reply | Threaded
Open this post in threaded view
|

回复:关于statement输出结果疑问

Shuai Xia
Hi,大佬,想问下如果使用Lazy调度模式,情况会是什么样子


------------------------------------------------------------------
发件人:Dian Fu <[hidden email]>
发送时间:2021年3月15日(星期一) 15:49
收件人:刘杰鸿 <[hidden email]>
抄 送:user-zh <[hidden email]>
主 题:Re: 关于statement输出结果疑问

奥,那你理解错了。这里面其实细分成2种情况:
- sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据
- sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。

但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。

> 2021年3月12日 下午10:52,刘杰鸿 <[hidden email]> 写道:
>
> 我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。
> 所以输出应该是1,2,1,2
>
> ------------------ 原始邮件 ------------------
> 发件人: "Dian Fu" <[hidden email]>;
> 发送时间: 2021年3月12日(星期五) 晚上10:24
> 收件人: "user-zh"<[hidden email]>;"刘杰鸿"<[hidden email]>;
> 主题: Re: 关于statement输出结果疑问
>
> 可以说一下为什么你觉得输出结果应该是1,2,1,2吗?
>
> 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2
>
> On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email] <mailto:[hidden email]>> wrote:
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>
> env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
>
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
> table_env.create_temporary_view("simple_source", table)
> table_env.execute_sql("""
>     CREATE TABLE first_sink_table (
>         id BIGINT,
>         data VARCHAR
>     ) WITH (
>         'connector' = 'print'
>     )
> """)
> table_env.execute_sql("""
>     CREATE TABLE second_sink_table (
>         id BIGINT,
>         data VARCHAR
>     ) WITH (
>         'connector' = 'print'
>     )
> """)
> # 创建一个statement对象
> statement_set = table_env.create_statement_set()
> # 使用TABLE API 将table表插进first_sink_table表里面
> statement_set.add_insert("first_sink_table", table)
> # 使用SQL将table表插进second_sink_table表里面
> statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
> # 执行查询
> statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4&gt; +I(1,Hi)
> 4&gt; +I(1,Hi)
> 4&gt; +I(2,Hello)
> 4&gt; +I(2,Hello)

Reply | Threaded
Open this post in threaded view
|

Re: 关于statement输出结果疑问

Dian Fu
流作业本身是不支持lazy模式的。

> 2021年3月17日 下午3:33,Shuai Xia <[hidden email]> 写道:
>
> Hi,大佬,想问下如果使用Lazy调度模式,情况会是什么样子
>
> ------------------------------------------------------------------
> 发件人:Dian Fu <[hidden email] <mailto:[hidden email]>>
> 发送时间:2021年3月15日(星期一) 15:49
> 收件人:刘杰鸿 <[hidden email] <mailto:[hidden email]>>
> 抄 送:user-zh <[hidden email] <mailto:[hidden email]>>
> 主 题:Re: 关于statement输出结果疑问
>
> 奥,那你理解错了。这里面其实细分成2种情况:
> - sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据
> - sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。
>
> 但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。
>
> > 2021年3月12日 下午10:52,刘杰鸿 <[hidden email]> 写道:
> >
> > 我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。
> > 所以输出应该是1,2,1,2
> >
> > ------------------ 原始邮件 ------------------
> > 发件人: "Dian Fu" <[hidden email]>;
> > 发送时间: 2021年3月12日(星期五) 晚上10:24
> > 收件人: "user-zh"<[hidden email]>;"刘杰鸿"<[hidden email]>;
> > 主题: Re: 关于statement输出结果疑问
> >
> > 可以说一下为什么你觉得输出结果应该是1,2,1,2吗?
> >
> > 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2
> >
> > On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email] <mailto:[hidden email]> <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
> > from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> >
> > env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> >
> > table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
> > table_env.create_temporary_view("simple_source", table)
> > table_env.execute_sql("""
> >     CREATE TABLE first_sink_table (
> >         id BIGINT,
> >         data VARCHAR
> >     ) WITH (
> >         'connector' = 'print'
> >     )
> > """)
> > table_env.execute_sql("""
> >     CREATE TABLE second_sink_table (
> >         id BIGINT,
> >         data VARCHAR
> >     ) WITH (
> >         'connector' = 'print'
> >     )
> > """)
> > # 创建一个statement对象
> > statement_set = table_env.create_statement_set()
> > # 使用TABLE API 将table表插进first_sink_table表里面
> > statement_set.add_insert("first_sink_table", table)
> > # 使用SQL将table表插进second_sink_table表里面
> > statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
> > # 执行查询
> > statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4&gt; +I(1,Hi)
> > 4&gt; +I(1,Hi)
> > 4&gt; +I(2,Hello)
> > 4&gt; +I(2,Hello)