|
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; ... DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream.join(greenStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(TumblingEventTimeWindows.of(Time.milliseconds(2))) .apply (new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });简写这个例子, 我在richJoinFunction使用valueState,给每个窗口做一个汇总 发现state会一直增长不会随窗口的销毁而销毁, 请问下有什么办法实现?
|