FLINK 不同 StateBackend ProcessWindowFunction的差别

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK 不同 StateBackend ProcessWindowFunction的差别

CHENJIE
各位好!
祝大家新年快乐!




--版本
FLINK 1.9.1 ON YARN


--过程
1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
--问题
new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
这种计算场景有更好的计算方法吗?


--部分代码
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


new ProcessWindowFunction{
public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception {
for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) {
....
iter.remove();
}
}
....
}







Reply | Threaded
Open this post in threaded view
|

Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

Yun Tang
Hi

使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]
至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。
而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。


[1] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
[2] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
[3] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119

祝好
唐云

________________________________
From: USERNAME <[hidden email]>
Sent: Tuesday, January 7, 2020 17:54
To: [hidden email] <[hidden email]>
Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别

各位好!
祝大家新年快乐!




--版本
FLINK 1.9.1 ON YARN


--过程
1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
--问题
new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
这种计算场景有更好的计算方法吗?


--部分代码
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


new ProcessWindowFunction{
public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception {
for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) {
....
iter.remove();
}
}
....
}







Reply | Threaded
Open this post in threaded view
|

Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

Michael Ran
In reply to this post by CHENJIE


过期数据能通过TTL 设置过期吗?

> 在 2020年1月7日,17:54,USERNAME <[hidden email]> 写道:
>
> 各位好!
> 祝大家新年快乐!
>
>
>
>
> --版本
> FLINK 1.9.1 ON YARN
>
>
> --过程
> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
> --问题
> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
> 这种计算场景有更好的计算方法吗?
>
>
> --部分代码
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> new ProcessWindowFunction{
> public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception {
> for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) {
> ....
> iter.remove();
> }
> }
> ....
> }
>
>
>
>
>
>
>


Reply | Threaded
Open this post in threaded view
|

Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

CHENJIE
我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。





在 2020-01-07 19:51:57,"huoguo" <[hidden email]> 写道:

>
>
>过期数据能通过TTL 设置过期吗?
>
>> 在 2020年1月7日,17:54,USERNAME <[hidden email]> 写道:
>>
>> 各位好!
>> 祝大家新年快乐!
>>
>>
>>
>>
>> --版本
>> FLINK 1.9.1 ON YARN
>>
>>
>> --过程
>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>> --问题
>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>> 这种计算场景有更好的计算方法吗?
>>
>>
>> --部分代码
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>
>> new ProcessWindowFunction{
>> public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception {
>> for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) {
>> ....
>> iter.remove();
>> }
>> }
>> ....
>> }
>>
>>
>>
>>
>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

CHENJIE
TTL 好像不支持 TimeCharacteristic.EventTime 方式



在 2020-01-08 14:17:11,"USERNAME" <[hidden email]> 写道:

>我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。
>
>
>
>
>
>在 2020-01-07 19:51:57,"huoguo" <[hidden email]> 写道:
>>
>>
>>过期数据能通过TTL 设置过期吗?
>>
>>> 在 2020年1月7日,17:54,USERNAME <[hidden email]> 写道:
>>>
>>> 各位好!
>>> 祝大家新年快乐!
>>>
>>>
>>>
>>>
>>> --版本
>>> FLINK 1.9.1 ON YARN
>>>
>>>
>>> --过程
>>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>>> --问题
>>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>>> 这种计算场景有更好的计算方法吗?
>>>
>>>
>>> --部分代码
>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>>
>>> new ProcessWindowFunction{
>>> public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception {
>>> for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) {
>>> ....
>>> iter.remove();
>>> }
>>> }
>>> ....
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

CHENJIE
In reply to this post by Yun Tang
感谢 唐老师 解答!

在 2020-01-07 19:46:06,"Yun Tang" <[hidden email]> 写道:

>Hi
>
>使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]
>至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。
>而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。
>
>
>[1] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>[2] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
>[3] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119
>
>祝好
>唐云
>
>________________________________
>From: USERNAME <[hidden email]>
>Sent: Tuesday, January 7, 2020 17:54
>To: [hidden email] <[hidden email]>
>Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别
>
>各位好!
>祝大家新年快乐!
>
>
>
>
>--版本
>FLINK 1.9.1 ON YARN
>
>
>--过程
>1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>--问题
>new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>这种计算场景有更好的计算方法吗?
>
>
>--部分代码
>final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>new ProcessWindowFunction{
>public void process(Tuple tuple, Context context, Iterable<StringBean> elements, Collector<String> out) throws Exception {
>for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) {
>....
>iter.remove();
>}
>}
>....
>}
>
>
>
>
>
>
>