各位好!
祝大家新年快乐! --版本 FLINK 1.9.1 ON YARN --过程 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 --问题 new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 这种计算场景有更好的计算方法吗? --部分代码 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); new ProcessWindowFunction{ public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception { for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) { .... iter.remove(); } } .... } |
Hi
使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1] 至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。 而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。 [1] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377 [2] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57 [3] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119 祝好 唐云 ________________________________ From: USERNAME <[hidden email]> Sent: Tuesday, January 7, 2020 17:54 To: [hidden email] <[hidden email]> Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别 各位好! 祝大家新年快乐! --版本 FLINK 1.9.1 ON YARN --过程 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 --问题 new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 这种计算场景有更好的计算方法吗? --部分代码 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); new ProcessWindowFunction{ public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception { for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) { .... iter.remove(); } } .... } |
In reply to this post by CHENJIE
过期数据能通过TTL 设置过期吗? > 在 2020年1月7日,17:54,USERNAME <[hidden email]> 写道: > > 各位好! > 祝大家新年快乐! > > > > > --版本 > FLINK 1.9.1 ON YARN > > > --过程 > 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 > 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 > 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 > --问题 > new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, > 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 > 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 > 这种计算场景有更好的计算方法吗? > > > --部分代码 > final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > new ProcessWindowFunction{ > public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception { > for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) { > .... > iter.remove(); > } > } > .... > } > > > > > > > |
我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。
在 2020-01-07 19:51:57,"huoguo" <[hidden email]> 写道: > > >过期数据能通过TTL 设置过期吗? > >> 在 2020年1月7日,17:54,USERNAME <[hidden email]> 写道: >> >> 各位好! >> 祝大家新年快乐! >> >> >> >> >> --版本 >> FLINK 1.9.1 ON YARN >> >> >> --过程 >> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 >> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 >> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 >> --问题 >> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, >> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 >> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 >> 这种计算场景有更好的计算方法吗? >> >> >> --部分代码 >> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> >> new ProcessWindowFunction{ >> public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception { >> for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) { >> .... >> iter.remove(); >> } >> } >> .... >> } >> >> >> >> >> >> >> > |
TTL 好像不支持 TimeCharacteristic.EventTime 方式
在 2020-01-08 14:17:11,"USERNAME" <[hidden email]> 写道: >我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。 > > > > > >在 2020-01-07 19:51:57,"huoguo" <[hidden email]> 写道: >> >> >>过期数据能通过TTL 设置过期吗? >> >>> 在 2020年1月7日,17:54,USERNAME <[hidden email]> 写道: >>> >>> 各位好! >>> 祝大家新年快乐! >>> >>> >>> >>> >>> --版本 >>> FLINK 1.9.1 ON YARN >>> >>> >>> --过程 >>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 >>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 >>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 >>> --问题 >>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, >>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 >>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 >>> 这种计算场景有更好的计算方法吗? >>> >>> >>> --部分代码 >>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> >>> new ProcessWindowFunction{ >>> public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception { >>> for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) { >>> .... >>> iter.remove(); >>> } >>> } >>> .... >>> } >>> >>> >>> >>> >>> >>> >>> >> |
In reply to this post by Yun Tang
感谢 唐老师 解答!
在 2020-01-07 19:46:06,"Yun Tang" <[hidden email]> 写道: >Hi > >使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1] >至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。 >而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。 > > >[1] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377 >[2] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57 >[3] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119 > >祝好 >唐云 > >________________________________ >From: USERNAME <[hidden email]> >Sent: Tuesday, January 7, 2020 17:54 >To: [hidden email] <[hidden email]> >Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别 > >各位好! >祝大家新年快乐! > > > > >--版本 >FLINK 1.9.1 ON YARN > > >--过程 >1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 >2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 >3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 >--问题 >new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, >使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 >使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 >这种计算场景有更好的计算方法吗? > > >--部分代码 >final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > >new ProcessWindowFunction{ >public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception { >for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) { >.... >iter.remove(); >} >} >.... >} > > > > > > > |
Free forum by Nabble | Edit this page |