|
Hi All
This My Coding:
statDataStream
.map(new InnerStatMap(logType))
.uid("InnerStatMap").name("InnerStatMap")
.keyBy(new InnerKeySelector)
.timeWindow(Time.seconds(statTimeWindow))
.reduce(new InnerStatReduce)
.uid("InnerReduce").name("InnerReduce")
.addSink(innerStatProducer)
.uid("InnerSink").name("InnerSink")
.setParallelism(sinkParallelism)
I find checkpoint state is becoming increasingly over time
best wishes
|