你好,
reduce在没有开窗口的时候,是一条一条来处理的。因为keyby以后是根据key分组以后的,不开窗口是无限流的形式走的。当开了window窗口以后,你可以理解为一个batch,然后对这一块数据进行了keyby后就会有一条数据了,如果你reduce里面再有个规则,比如按照time进行大小比较,只要最近的那一条重复的,那么最后就是那一条最新的数据了。这个自己也可以做个demo。如下是自己本地的测试,你也可以体验一下。希望可以帮助到你,能力有限,哪儿说的不对请见谅!!
public class ReduceTest {
private static final Logger logger = LoggerFactory.getLogger(ReduceTest.class);
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<String> source = env.readTextFile("/Users/allanqin/myprojects/spend-report/demo/src/main/resources/reduce.txt");
source
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String s) throws Exception {
ObjectMapper mapper = new ObjectMapper();
ReduceEntity reduceEntity = mapper.readValue(s, ReduceEntity.class);
return reduceEntity.getName();
}
}, TypeInformation.of(new TypeHint<String>() {
}))
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<String>() {
ReduceEntity entity = new ReduceEntity();
@Override
public String reduce(String v1, String v2) throws Exception {
// ObjectMapper mapper = new ObjectMapper();
// ReduceEntity reduceEntity = mapper.readValue(v1, ReduceEntity.class);
// ReduceEntity reduceEntity2 = mapper.readValue(v2, ReduceEntity.class);
// if (reduceEntity.getAge() > reduceEntity2.getAge()) {
// return v1;
// }
return v2;
}
})
.print();
env.execute("test");
}
}
txt文件内容:
{ "name" : "allanqinjy", "age" : 4 }
{ "name" : "allanqinjy", "age" : 45 }
{ "name" : "allanqinjy", "age" : 6 }
{ "name" : "allanqinjy", "age" : 9 }
在 2020-09-12 17:57:01,"ゞ野蠻遊戲χ" <
[hidden email]> 写道:
大家好
在window算子之后使用reduce算子,是否是把当前窗口的所有元素根据reduce算子计算完成之后,仅仅输出一条到下游,还是当前窗口前后2个元素每次进入reduce算子,计算完成之后就往下游输出一条?
Thanks
嘉治