from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) table_env.create_temporary_view("simple_source", table) table_env.execute_sql(""" CREATE TABLE first_sink_table ( id BIGINT, data VARCHAR ) WITH ( 'connector' = 'print' ) """) table_env.execute_sql(""" CREATE TABLE second_sink_table ( id BIGINT, data VARCHAR ) WITH ( 'connector' = 'print' ) """) # 创建一个statement对象 statement_set = table_env.create_statement_set() # 使用TABLE API 将table表插进first_sink_table表里面 statement_set.add_insert("first_sink_table", table) # 使用SQL将table表插进second_sink_table表里面 statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source") # 执行查询 statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> +I(1,Hi) 4> +I(1,Hi) 4> +I(2,Hello) 4> +I(2,Hello) |
可以说一下为什么你觉得输出结果应该是1,2,1,2吗?
个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2 On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email]> wrote: > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = > StreamTableEnvironment.create(environment_settings=env_settings) > > table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) > table_env.create_temporary_view("simple_source", table) > table_env.execute_sql(""" > CREATE TABLE first_sink_table ( > id BIGINT, > data VARCHAR > ) WITH ( > 'connector' = 'print' > ) > """) > table_env.execute_sql(""" > CREATE TABLE second_sink_table ( > id BIGINT, > data VARCHAR > ) WITH ( > 'connector' = 'print' > ) > """) > # 创建一个statement对象 > statement_set = table_env.create_statement_set() > # 使用TABLE API 将table表插进first_sink_table表里面 > statement_set.add_insert("first_sink_table", table) > # 使用SQL将table表插进second_sink_table表里面 > statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM > simple_source") > # 执行查询 > statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> > +I(1,Hi) > 4> +I(1,Hi) > 4> +I(2,Hello) > 4> +I(2,Hello) |
我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。
所以输出应该是1,2,1,2 ------------------ 原始邮件 ------------------ 发件人: "Dian Fu" <[hidden email]>; 发送时间: 2021年3月12日(星期五) 晚上10:24 收件人: "user-zh"<[hidden email]>;"刘杰鸿"<[hidden email]>; 主题: Re: 关于statement输出结果疑问 可以说一下为什么你觉得输出结果应该是1,2,1,2吗? 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2 On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email]> wrote: from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) table_env.create_temporary_view("simple_source", table) table_env.execute_sql(""" CREATE TABLE first_sink_table ( id BIGINT, data VARCHAR ) WITH ( 'connector' = 'print' ) """) table_env.execute_sql(""" CREATE TABLE second_sink_table ( id BIGINT, data VARCHAR ) WITH ( 'connector' = 'print' ) """) # 创建一个statement对象 statement_set = table_env.create_statement_set() # 使用TABLE API 将table表插进first_sink_table表里面 statement_set.add_insert("first_sink_table", table) # 使用SQL将table表插进second_sink_table表里面 statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source") # 执行查询 statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4&gt; +I(1,Hi) 4&gt; +I(1,Hi) 4&gt; +I(2,Hello) 4&gt; +I(2,Hello) |
奥,那你理解错了。这里面其实细分成2种情况:
- sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据 - sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。 但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。 > 2021年3月12日 下午10:52,刘杰鸿 <[hidden email]> 写道: > > 我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。 > 所以输出应该是1,2,1,2 > > ------------------ 原始邮件 ------------------ > 发件人: "Dian Fu" <[hidden email]>; > 发送时间: 2021年3月12日(星期五) 晚上10:24 > 收件人: "user-zh"<[hidden email]>;"刘杰鸿"<[hidden email]>; > 主题: Re: 关于statement输出结果疑问 > > 可以说一下为什么你觉得输出结果应该是1,2,1,2吗? > > 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2 > > On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email] <mailto:[hidden email]>> wrote: > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > > table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) > table_env.create_temporary_view("simple_source", table) > table_env.execute_sql(""" > CREATE TABLE first_sink_table ( > id BIGINT, > data VARCHAR > ) WITH ( > 'connector' = 'print' > ) > """) > table_env.execute_sql(""" > CREATE TABLE second_sink_table ( > id BIGINT, > data VARCHAR > ) WITH ( > 'connector' = 'print' > ) > """) > # 创建一个statement对象 > statement_set = table_env.create_statement_set() > # 使用TABLE API 将table表插进first_sink_table表里面 > statement_set.add_insert("first_sink_table", table) > # 使用SQL将table表插进second_sink_table表里面 > statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source") > # 执行查询 > statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> +I(1,Hi) > 4> +I(1,Hi) > 4> +I(2,Hello) > 4> +I(2,Hello) |
Hi,大佬,想问下如果使用Lazy调度模式,情况会是什么样子
------------------------------------------------------------------ 发件人:Dian Fu <[hidden email]> 发送时间:2021年3月15日(星期一) 15:49 收件人:刘杰鸿 <[hidden email]> 抄 送:user-zh <[hidden email]> 主 题:Re: 关于statement输出结果疑问 奥,那你理解错了。这里面其实细分成2种情况: - sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据 - sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。 但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。 > 2021年3月12日 下午10:52,刘杰鸿 <[hidden email]> 写道: > > 我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。 > 所以输出应该是1,2,1,2 > > ------------------ 原始邮件 ------------------ > 发件人: "Dian Fu" <[hidden email]>; > 发送时间: 2021年3月12日(星期五) 晚上10:24 > 收件人: "user-zh"<[hidden email]>;"刘杰鸿"<[hidden email]>; > 主题: Re: 关于statement输出结果疑问 > > 可以说一下为什么你觉得输出结果应该是1,2,1,2吗? > > 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2 > > On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email] <mailto:[hidden email]>> wrote: > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > > table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) > table_env.create_temporary_view("simple_source", table) > table_env.execute_sql(""" > CREATE TABLE first_sink_table ( > id BIGINT, > data VARCHAR > ) WITH ( > 'connector' = 'print' > ) > """) > table_env.execute_sql(""" > CREATE TABLE second_sink_table ( > id BIGINT, > data VARCHAR > ) WITH ( > 'connector' = 'print' > ) > """) > # 创建一个statement对象 > statement_set = table_env.create_statement_set() > # 使用TABLE API 将table表插进first_sink_table表里面 > statement_set.add_insert("first_sink_table", table) > # 使用SQL将table表插进second_sink_table表里面 > statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source") > # 执行查询 > statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> +I(1,Hi) > 4> +I(1,Hi) > 4> +I(2,Hello) > 4> +I(2,Hello) |
流作业本身是不支持lazy模式的。
> 2021年3月17日 下午3:33,Shuai Xia <[hidden email]> 写道: > > Hi,大佬,想问下如果使用Lazy调度模式,情况会是什么样子 > > ------------------------------------------------------------------ > 发件人:Dian Fu <[hidden email] <mailto:[hidden email]>> > 发送时间:2021年3月15日(星期一) 15:49 > 收件人:刘杰鸿 <[hidden email] <mailto:[hidden email]>> > 抄 送:user-zh <[hidden email] <mailto:[hidden email]>> > 主 题:Re: 关于statement输出结果疑问 > > 奥,那你理解错了。这里面其实细分成2种情况: > - sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据 > - sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。 > > 但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。 > > > 2021年3月12日 下午10:52,刘杰鸿 <[hidden email]> 写道: > > > > 我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。 > > 所以输出应该是1,2,1,2 > > > > ------------------ 原始邮件 ------------------ > > 发件人: "Dian Fu" <[hidden email]>; > > 发送时间: 2021年3月12日(星期五) 晚上10:24 > > 收件人: "user-zh"<[hidden email]>;"刘杰鸿"<[hidden email]>; > > 主题: Re: 关于statement输出结果疑问 > > > > 可以说一下为什么你觉得输出结果应该是1,2,1,2吗? > > > > 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2 > > > > On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 <[hidden email] <mailto:[hidden email]> <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > > > env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > > > > table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) > > table_env.create_temporary_view("simple_source", table) > > table_env.execute_sql(""" > > CREATE TABLE first_sink_table ( > > id BIGINT, > > data VARCHAR > > ) WITH ( > > 'connector' = 'print' > > ) > > """) > > table_env.execute_sql(""" > > CREATE TABLE second_sink_table ( > > id BIGINT, > > data VARCHAR > > ) WITH ( > > 'connector' = 'print' > > ) > > """) > > # 创建一个statement对象 > > statement_set = table_env.create_statement_set() > > # 使用TABLE API 将table表插进first_sink_table表里面 > > statement_set.add_insert("first_sink_table", table) > > # 使用SQL将table表插进second_sink_table表里面 > > statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source") > > # 执行查询 > > statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> +I(1,Hi) > > 4> +I(1,Hi) > > 4> +I(2,Hello) > > 4> +I(2,Hello) |
Free forum by Nabble | Edit this page |