双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

xuefli
遇到一个非常头痛的问题

Flink1.10的集群,用hdfs做backend

一个流aStream准备了10亿的数据,另外一个流bStream百万
如果如下操作
我遇到一个问题 双流Join 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 再对cStream进行keyBy-->timeWindow-->sum.
我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
但数据量很大时,就会这样。


每次计算的结果不一样,这个对业务系统挑战巨大


发送自 Windows 10 版邮件应用

Reply | Threaded
Open this post in threaded view
|

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

tison
IngestionTime 多次运行结果不一样很正常啊,试试 event time?

Best,
tison.


xuefli <[hidden email]> 于2020年4月15日周三 下午10:10写道:

> 遇到一个非常头痛的问题
>
> Flink1.10的集群,用hdfs做backend
>
> 一个流aStream准备了10亿的数据,另外一个流bStream百万
> 如果如下操作
>
> 我遇到一个问题 双流Join 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 再对cStream进行keyBy-->timeWindow-->sum.
> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
> 但数据量很大时,就会这样。
>
>
> 每次计算的结果不一样,这个对业务系统挑战巨大
>
>
> 发送自 Windows 10 版邮件应用
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

tison
FYI

https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html

IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
EventTime,在 Watermark
不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。

Best,
tison.


tison <[hidden email]> 于2020年4月15日周三 下午10:18写道:

> IngestionTime 多次运行结果不一样很正常啊,试试 event time?
>
> Best,
> tison.
>
>
> xuefli <[hidden email]> 于2020年4月15日周三 下午10:10写道:
>
>> 遇到一个非常头痛的问题
>>
>> Flink1.10的集群,用hdfs做backend
>>
>> 一个流aStream准备了10亿的数据,另外一个流bStream百万
>> 如果如下操作
>>
>> 我遇到一个问题 双流Join 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 再对cStream进行keyBy-->timeWindow-->sum.
>> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
>> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
>> 但数据量很大时,就会这样。
>>
>>
>> 每次计算的结果不一样,这个对业务系统挑战巨大
>>
>>
>> 发送自 Windows 10 版邮件应用
>>
>>
Reply | Threaded
Open this post in threaded view
|

回复: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

xuefli@outlook.com
双流join对数据和环境的要求很严格,这样适用的业务场景就会有限。没有办法保证环境和数据一致是严格符合的

发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: tison<mailto:[hidden email]>
发送时间: 2020年4月15日 22:26
收件人: user-zh<mailto:[hidden email]>
主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

FYI

https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html

IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
EventTime,在 Watermark
不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。

Best,
tison.


tison <[hidden email]> 于2020年4月15日周三 下午10:18写道:

> IngestionTime 多次运行结果不一样很正常啊,试试 event time?
>
> Best,
> tison.
>
>
> xuefli <[hidden email]> 于2020年4月15日周三 下午10:10写道:
>
>> 遇到一个非常头痛的问题
>>
>> Flink1.10的集群,用hdfs做backend
>>
>> 一个流aStream准备了10亿的数据,另外一个流bStream百万
>> 如果如下操作
>>
>> 我遇到一个问题 双流Join 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 再对cStream进行keyBy-->timeWindow-->sum.
>> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
>> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
>> 但数据量很大时,就会这样。
>>
>>
>> 每次计算的结果不一样,这个对业务系统挑战巨大
>>
>>
>> 发送自 Windows 10 版邮件应用
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

Benchao Li
我感觉双流join如果要保证结果是一致的,需要用事件时间,而不是处理时间或者是摄入时间。
如果可能,建议尝试下基于事件时间的双流join。

[hidden email] <[hidden email]> 于2020年4月16日周四 上午9:15写道:

> 双流join对数据和环境的要求很严格,这样适用的业务场景就会有限。没有办法保证环境和数据一致是严格符合的
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
> 发件人: tison<mailto:[hidden email]>
> 发送时间: 2020年4月15日 22:26
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题
>
> FYI
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html
>
> IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
> window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
> EventTime,在 Watermark
> 不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。
>
> Best,
> tison.
>
>
> tison <[hidden email]> 于2020年4月15日周三 下午10:18写道:
>
> > IngestionTime 多次运行结果不一样很正常啊,试试 event time?
> >
> > Best,
> > tison.
> >
> >
> > xuefli <[hidden email]> 于2020年4月15日周三 下午10:10写道:
> >
> >> 遇到一个非常头痛的问题
> >>
> >> Flink1.10的集群,用hdfs做backend
> >>
> >> 一个流aStream准备了10亿的数据,另外一个流bStream百万
> >> 如果如下操作
> >>
> >> 我遇到一个问题 双流Join
> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
> 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后
> 再对cStream进行keyBy-->timeWindow-->sum.
> >> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
> >> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
> >> 但数据量很大时,就会这样。
> >>
> >>
> >> 每次计算的结果不一样,这个对业务系统挑战巨大
> >>
> >>
> >> 发送自 Windows 10 版邮件应用
> >>
> >>
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

xuefli@outlook.com
双流join涉及的问题我罗列完整一下:
前提:
假设有两个流
其中一个流aStream非常庞大,基于时间单位是秒,源源不断的生产数据
另外一个流bStream也是很大,是不太可能基于内存做维度数据全局缓冲或者LRU淘汰,因为aStream使用bStream足够分散和随机,基于时间单位是天或者月,会持续不断的变化,部分数据或者长期不变
影响流的因子:

1、         系统集群资源,主要是内存

2、        流速

3、        不同流数据变化的时间单位不一致

4、        同一流内数据变化的时间单位不一致
目标:
因两个流的数据都原样的保留下来,重算时,要保持每次运算结果是一致的

对于操作bStream.join(aStream).windows().apply()
如果是基于eventTime的问题是
对于aStream可以按照每个时间窗口处理数据,合适的随着时间的流式划分窗口处理,
但对于bStream,因每个部分的数据的有效时间范围不同,bStream的数据是长期驻留在state,还是超过window就被淘汰,如果是被淘汰,那么计算结果肯定有问题,即aStream中的数据肯定从业务上可以匹配到bStream中的数据

如果是基于ingressTime的问题是
aStream和bStream都受系统运行环境的影响,但如果有办法对于aStream在任何一个window中的数据都能匹配到bStream的数据,肯定没有问题

那么剩下的关键问题就是:

A、        对于aStream在windows中的数据如何一定匹配到bStream中的数据

B、        对于bStream中的数据每条数据的可用时间范围是变化的,如何保持更新

发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: Benchao Li<mailto:[hidden email]>
发送时间: 2020年4月16日 10:21
收件人: user-zh<mailto:[hidden email]>
主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

我感觉双流join如果要保证结果是一致的,需要用事件时间,而不是处理时间或者是摄入时间。
如果可能,建议尝试下基于事件时间的双流join。

[hidden email] <[hidden email]> 于2020年4月16日周四 上午9:15写道:

> 双流join对数据和环境的要求很严格,这样适用的业务场景就会有限。没有办法保证环境和数据一致是严格符合的
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
> 发件人: tison<mailto:[hidden email]>
> 发送时间: 2020年4月15日 22:26
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题
>
> FYI
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html
>
> IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
> window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
> EventTime,在 Watermark
> 不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。
>
> Best,
> tison.
>
>
> tison <[hidden email]> 于2020年4月15日周三 下午10:18写道:
>
> > IngestionTime 多次运行结果不一样很正常啊,试试 event time?
> >
> > Best,
> > tison.
> >
> >
> > xuefli <[hidden email]> 于2020年4月15日周三 下午10:10写道:
> >
> >> 遇到一个非常头痛的问题
> >>
> >> Flink1.10的集群,用hdfs做backend
> >>
> >> 一个流aStream准备了10亿的数据,另外一个流bStream百万
> >> 如果如下操作
> >>
> >> 我遇到一个问题 双流Join
> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
> 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后
> 再对cStream进行keyBy-->timeWindow-->sum.
> >> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
> >> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
> >> 但数据量很大时,就会这样。
> >>
> >>
> >> 每次计算的结果不一样,这个对业务系统挑战巨大
> >>
> >>
> >> 发送自 Windows 10 版邮件应用
> >>
> >>
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

Zai Arnold
如果选择ingressTime就应该默认结果不一致了,即使看上去是一致的:
1.无法保证每个流的网络IO一致(那就默认为绝对不一致), 这样数据ingress time生产就会不一致。
2.系统资源导致ingress time不一致
因此不太建议依赖ingress 一致。 `这一点golang/map的处理,比较赞成的, 既然不确保有序那就主动打乱它`

或许可以自己维护一个计数器,在source中添加自定义event-time.来模拟ingress time(根据eps计算下counter增长策略)

[hidden email] <[hidden email]> 于2020年4月16日周四 上午10:57写道:

> 双流join涉及的问题我罗列完整一下:
> 前提:
> 假设有两个流
> 其中一个流aStream非常庞大,基于时间单位是秒,源源不断的生产数据
>
> 另外一个流bStream也是很大,是不太可能基于内存做维度数据全局缓冲或者LRU淘汰,因为aStream使用bStream足够分散和随机,基于时间单位是天或者月,会持续不断的变化,部分数据或者长期不变
> 影响流的因子:
>
> 1、         系统集群资源,主要是内存
>
> 2、        流速
>
> 3、        不同流数据变化的时间单位不一致
>
> 4、        同一流内数据变化的时间单位不一致
> 目标:
> 因两个流的数据都原样的保留下来,重算时,要保持每次运算结果是一致的
>
> 对于操作bStream.join(aStream).windows().apply()
> 如果是基于eventTime的问题是
> 对于aStream可以按照每个时间窗口处理数据,合适的随着时间的流式划分窗口处理,
>
> 但对于bStream,因每个部分的数据的有效时间范围不同,bStream的数据是长期驻留在state,还是超过window就被淘汰,如果是被淘汰,那么计算结果肯定有问题,即aStream中的数据肯定从业务上可以匹配到bStream中的数据
>
> 如果是基于ingressTime的问题是
>
> aStream和bStream都受系统运行环境的影响,但如果有办法对于aStream在任何一个window中的数据都能匹配到bStream的数据,肯定没有问题
>
> 那么剩下的关键问题就是:
>
> A、        对于aStream在windows中的数据如何一定匹配到bStream中的数据
>
> B、        对于bStream中的数据每条数据的可用时间范围是变化的,如何保持更新
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
> 发件人: Benchao Li<mailto:[hidden email]>
> 发送时间: 2020年4月16日 10:21
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题
>
> 我感觉双流join如果要保证结果是一致的,需要用事件时间,而不是处理时间或者是摄入时间。
> 如果可能,建议尝试下基于事件时间的双流join。
>
> [hidden email] <[hidden email]> 于2020年4月16日周四 上午9:15写道:
>
> > 双流join对数据和环境的要求很严格,这样适用的业务场景就会有限。没有办法保证环境和数据一致是严格符合的
> >
> > 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
> >
> > 发件人: tison<mailto:[hidden email]>
> > 发送时间: 2020年4月15日 22:26
> > 收件人: user-zh<mailto:[hidden email]>
> > 主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题
> >
> > FYI
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html
> >
> > IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
> > window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
> > EventTime,在 Watermark
> > 不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。
> >
> > Best,
> > tison.
> >
> >
> > tison <[hidden email]> 于2020年4月15日周三 下午10:18写道:
> >
> > > IngestionTime 多次运行结果不一样很正常啊,试试 event time?
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > xuefli <[hidden email]> 于2020年4月15日周三 下午10:10写道:
> > >
> > >> 遇到一个非常头痛的问题
> > >>
> > >> Flink1.10的集群,用hdfs做backend
> > >>
> > >> 一个流aStream准备了10亿的数据,另外一个流bStream百万
> > >> 如果如下操作
> > >>
> > >> 我遇到一个问题 双流Join
> >
> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
> > 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后
> > 再对cStream进行keyBy-->timeWindow-->sum.
> > >> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
> > >> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
> > >> 但数据量很大时,就会这样。
> > >>
> > >>
> > >> 每次计算的结果不一样,这个对业务系统挑战巨大
> > >>
> > >>
> > >> 发送自 Windows 10 版邮件应用
> > >>
> > >>
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>
>