正好看到这一部分,还是有的,你考虑下滑动的计数窗口
[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState Best, tison. USERNAME <[hidden email]> 于2020年1月21日周二 下午5:21写道: > 大家,新年快乐~ > > > [1] TriggerResult.FIRE_AND_PURGE > > https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74 > [2] CountEvictor > > https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377 > > |
evict 丢弃掉的数据,在内存或者RocksDB中也会同步删除吗?
在 2020-01-21 17:27:38,"tison" <[hidden email]> 写道: >正好看到这一部分,还是有的,你考虑下滑动的计数窗口 > >[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState > >Best, >tison. > > >USERNAME <[hidden email]> 于2020年1月21日周二 下午5:21写道: > >> 大家,新年快乐~ >> >> >> [1] TriggerResult.FIRE_AND_PURGE >> >> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74 >> [2] CountEvictor >> >> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377 >> >> |
你读一下 EvictingWindowOperator 相关代码或者说 Evictor#evictBefore 的调用链,里面关于 window
state 的处理是比较 hack 的,用文字说也起不到简练的作用 private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); // Work around type system restrictions... FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable .from(contents) .transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() { @Override public TimestampedValue<IN> apply(StreamRecord<IN> input) { return TimestampedValue.from(input); } }); evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); FluentIterable<IN> projectedContents = recordsWithTimestamp .transform(new Function<TimestampedValue<IN>, IN>() { @Override public IN apply(TimestampedValue<IN> input) { return input.getValue(); } }); processContext.window = triggerContext.window; userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector); evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); //work around to fix FLINK-4369, remove the evicted elements from the windowState. //this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState. windowState.clear(); for (TimestampedValue<IN> record : recordsWithTimestamp) { windowState.add(record.getStreamRecord()); } } Best, tison. USERNAME <[hidden email]> 于2020年1月21日周二 下午8:25写道: > evict 丢弃掉的数据,在内存或者RocksDB中也会同步删除吗? > > > > > > > 在 2020-01-21 17:27:38,"tison" <[hidden email]> 写道: > >正好看到这一部分,还是有的,你考虑下滑动的计数窗口 > > > >[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState > > > >Best, > >tison. > > > > > >USERNAME <[hidden email]> 于2020年1月21日周二 下午5:21写道: > > > >> 大家,新年快乐~ > >> > >> > >> [1] TriggerResult.FIRE_AND_PURGE > >> > >> > https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74 > >> [2] CountEvictor > >> > >> > https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377 > >> > >> > |
Free forum by Nabble | Edit this page |