Flink 1.10,windows 10 flink api验证
代码如下 ``` import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.ArrayList; import java.util.List; public class KeyedStreamJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setParallelism(3); Tuple2<String, Integer> item = null; List<Tuple2<String, Integer>> items = new ArrayList<>(); item = new Tuple2<>("k1", 1); items.add(item); item = new Tuple2<>("k3", 10); items.add(item); item = new Tuple2<>("k1", 10); items.add(item); item = new Tuple2<>("k2", 2); items.add(item); item = new Tuple2<>("k1", 11); items.add(item); item = new Tuple2<>("k2", 20); items.add(item); DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items); streamSource //by 1 //.assignTimestampsAndWatermarks(new IngestionTimeExtractor()) .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) .sum(1) .print("+++++++++++++++++++++++++++"); env.execute("keyedSteamJob"); } } ``` 输出 ``` +++++++++++++++++++++++++++:1> (k3,10) +++++++++++++++++++++++++++:2> (k1,1) +++++++++++++++++++++++++++:8> (k2,22) +++++++++++++++++++++++++++:2> (k1,21) ``` 如果把 window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) 改成 .window(TumblingEventTimeWindows.of(Time.seconds(10L))) 输出 ``` +++++++++++++++++++++++++++:8> (k2,22) +++++++++++++++++++++++++++:1> (k3,10) +++++++++++++++++++++++++++:2> (k1,22) ``` 两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集 为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别 如果k1=1已经在ValueState中(2>(k1,1)), 那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11; 如果window改成1秒也是按照正常结果输出 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 |
如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合
输出 ``` +++++++++++++++++++++++++++:2> (k1,1) +++++++++++++++++++++++++++:1> (k3,10) +++++++++++++++++++++++++++:2> (k1,11) +++++++++++++++++++++++++++:8> (k2,2) +++++++++++++++++++++++++++:2> (k1,22) +++++++++++++++++++++++++++:8> (k2,22) ``` 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 发件人: [hidden email]<mailto:[hidden email]> 发送时间: 2020年6月1日 22:22 收件人: user-zh<mailto:[hidden email]> 主题: 关于使用IngressTime,window过小的问题 Flink 1.10,windows 10 flink api验证 代码如下 ``` import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.ArrayList; import java.util.List; public class KeyedStreamJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setParallelism(3); Tuple2<String, Integer> item = null; List<Tuple2<String, Integer>> items = new ArrayList<>(); item = new Tuple2<>("k1", 1); items.add(item); item = new Tuple2<>("k3", 10); items.add(item); item = new Tuple2<>("k1", 10); items.add(item); item = new Tuple2<>("k2", 2); items.add(item); item = new Tuple2<>("k1", 11); items.add(item); item = new Tuple2<>("k2", 20); items.add(item); DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items); streamSource //by 1 //.assignTimestampsAndWatermarks(new IngestionTimeExtractor()) .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) .sum(1) .print("+++++++++++++++++++++++++++"); env.execute("keyedSteamJob"); } } ``` 输出 ``` +++++++++++++++++++++++++++:1> (k3,10) +++++++++++++++++++++++++++:2> (k1,1) +++++++++++++++++++++++++++:8> (k2,22) +++++++++++++++++++++++++++:2> (k1,21) ``` 如果把 window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) 改成 .window(TumblingEventTimeWindows.of(Time.seconds(10L))) 输出 ``` +++++++++++++++++++++++++++:8> (k2,22) +++++++++++++++++++++++++++:1> (k3,10) +++++++++++++++++++++++++++:2> (k1,22) ``` 两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集 为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别 如果k1=1已经在ValueState中(2>(k1,1)), 那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11; 如果window改成1秒也是按照正常结果输出 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 |
如果说window的10毫秒的状态ValueState被超时逐出了,可以理解。但不带window的聚合操作是否意味着所有的key的
ValueState都存在与StateBackend中,是否会无限制增长,超过集群的一些限制,比如内存、slot等会怎么样。 即使ValueState在window中被逐出,但代码中未明确指定TimeService和逐出器。 如果假设相同的key在被处理时在时空上间隔足够远,不带windows和带windows的输出结果是否还会不同? 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 发件人: [hidden email]<mailto:[hidden email]> 发送时间: 2020年6月1日 22:27 收件人: [hidden email]<mailto:[hidden email]> 主题: 回复: 关于使用IngressTime,window过小的问题 如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合 输出 ``` +++++++++++++++++++++++++++:2> (k1,1) +++++++++++++++++++++++++++:1> (k3,10) +++++++++++++++++++++++++++:2> (k1,11) +++++++++++++++++++++++++++:8> (k2,2) +++++++++++++++++++++++++++:2> (k1,22) +++++++++++++++++++++++++++:8> (k2,22) ``` 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 发件人: [hidden email]<mailto:[hidden email]> 发送时间: 2020年6月1日 22:22 收件人: user-zh<mailto:[hidden email]> 主题: 关于使用IngressTime,window过小的问题 Flink 1.10,windows 10 flink api验证 代码如下 ``` import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.ArrayList; import java.util.List; public class KeyedStreamJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setParallelism(3); Tuple2<String, Integer> item = null; List<Tuple2<String, Integer>> items = new ArrayList<>(); item = new Tuple2<>("k1", 1); items.add(item); item = new Tuple2<>("k3", 10); items.add(item); item = new Tuple2<>("k1", 10); items.add(item); item = new Tuple2<>("k2", 2); items.add(item); item = new Tuple2<>("k1", 11); items.add(item); item = new Tuple2<>("k2", 20); items.add(item); DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items); streamSource //by 1 //.assignTimestampsAndWatermarks(new IngestionTimeExtractor()) .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) .sum(1) .print("+++++++++++++++++++++++++++"); env.execute("keyedSteamJob"); } } ``` 输出 ``` +++++++++++++++++++++++++++:1> (k3,10) +++++++++++++++++++++++++++:2> (k1,1) +++++++++++++++++++++++++++:8> (k2,22) +++++++++++++++++++++++++++:2> (k1,21) ``` 如果把 window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) 改成 .window(TumblingEventTimeWindows.of(Time.seconds(10L))) 输出 ``` +++++++++++++++++++++++++++:8> (k2,22) +++++++++++++++++++++++++++:1> (k3,10) +++++++++++++++++++++++++++:2> (k1,22) ``` 两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集 为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别 如果k1=1已经在ValueState中(2>(k1,1)), 那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11; 如果window改成1秒也是按照正常结果输出 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 |
把 Time.milliseconds(10L) 改成 Time.seconds(10L) 后,其实是改变了时间窗口的大小,这会使以前在同一个窗口的数据现在被分在了两个窗口里,而聚合的时候是按照窗口进行聚合的,所以结果变了。
在 2020-06-01 22:41:14,"[hidden email]" <[hidden email]> 写道: >如果说window的10毫秒的状态ValueState被超时逐出了,可以理解。但不带window的聚合操作是否意味着所有的key的 >ValueState都存在与StateBackend中,是否会无限制增长,超过集群的一些限制,比如内存、slot等会怎么样。 >即使ValueState在window中被逐出,但代码中未明确指定TimeService和逐出器。 > >如果假设相同的key在被处理时在时空上间隔足够远,不带windows和带windows的输出结果是否还会不同? > > >发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > >发件人: [hidden email]<mailto:[hidden email]> >发送时间: 2020年6月1日 22:27 >收件人: [hidden email]<mailto:[hidden email]> >主题: 回复: 关于使用IngressTime,window过小的问题 > >如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合 >输出 >``` >+++++++++++++++++++++++++++:2> (k1,1) >+++++++++++++++++++++++++++:1> (k3,10) >+++++++++++++++++++++++++++:2> (k1,11) >+++++++++++++++++++++++++++:8> (k2,2) >+++++++++++++++++++++++++++:2> (k1,22) >+++++++++++++++++++++++++++:8> (k2,22) >``` > >发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > >发件人: [hidden email]<mailto:[hidden email]> >发送时间: 2020年6月1日 22:22 >收件人: user-zh<mailto:[hidden email]> >主题: 关于使用IngressTime,window过小的问题 > >Flink 1.10,windows 10 flink api验证 > >代码如下 >``` > >import org.apache.flink.api.java.functions.KeySelector; >import org.apache.flink.api.java.tuple.Tuple2; >import org.apache.flink.streaming.api.TimeCharacteristic; >import org.apache.flink.streaming.api.datastream.DataStreamSource; >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; >import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; >import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; >import org.apache.flink.streaming.api.windowing.time.Time; > >import java.util.ArrayList; >import java.util.List; > >public class KeyedStreamJob { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >// env.setParallelism(3); > > Tuple2<String, Integer> item = null; > List<Tuple2<String, Integer>> items = new ArrayList<>(); > item = new Tuple2<>("k1", 1); > items.add(item); > item = new Tuple2<>("k3", 10); > items.add(item); > item = new Tuple2<>("k1", 10); > items.add(item); > item = new Tuple2<>("k2", 2); > items.add(item); > item = new Tuple2<>("k1", 11); > items.add(item); > item = new Tuple2<>("k2", 20); > items.add(item); > DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items); > streamSource > //by 1 > //.assignTimestampsAndWatermarks(new IngestionTimeExtractor()) > .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { > @Override > public String getKey(Tuple2<String, Integer> value) throws Exception { > return value.f0; > } > }) > .window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) > .sum(1) > .print("+++++++++++++++++++++++++++"); > > env.execute("keyedSteamJob"); > } >} > >``` >输出 >``` >+++++++++++++++++++++++++++:1> (k3,10) >+++++++++++++++++++++++++++:2> (k1,1) >+++++++++++++++++++++++++++:8> (k2,22) >+++++++++++++++++++++++++++:2> (k1,21) >``` >如果把 > >window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) >改成 > >.window(TumblingEventTimeWindows.of(Time.seconds(10L))) >输出 >``` >+++++++++++++++++++++++++++:8> (k2,22) >+++++++++++++++++++++++++++:1> (k3,10) >+++++++++++++++++++++++++++:2> (k1,22) >``` >两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集 > >为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别 > >如果k1=1已经在ValueState中(2>(k1,1)), >那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11; > > >如果window改成1秒也是按照正常结果输出 > > > > > >发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > > |
Free forum by Nabble | Edit this page |