含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

hao kong
hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

Congxian Qiu
Hi
    没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
    如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
    另外你也可以看下这个文档[2] 看是否在你的场景中有帮助

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
Best,
Congxian


hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:

> hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
>
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
>
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

hao kong
十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。

Congxian Qiu <[hidden email]> 于2020年9月16日周三 下午1:55写道:

> Hi
>     没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
>     如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
> 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
> 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
>     另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
> Best,
> Congxian
>
>
> hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:
>
> > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
> >
> >
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

nobleyd
的确问题没说明白,貌似flink不会存在类似问题。

hao kong <[hidden email]> 于2020年9月16日周三 下午6:45写道:

> 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
>
> Congxian Qiu <[hidden email]> 于2020年9月16日周三 下午1:55写道:
>
> > Hi
> >     没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
> >     如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
> > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
> > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
> >     另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
> > [2]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
> > Best,
> > Congxian
> >
> >
> > hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:
> >
> > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
> > >
> > >
> >
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

hao kong
hi
    感谢各位,pnowojski@apache.org为我提供了一个FLIP,
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment
,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,*
从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要自己实现高可用。

赵一旦 <[hidden email]> 于2020年9月21日周一 下午5:50写道:

> 的确问题没说明白,貌似flink不会存在类似问题。
>
> hao kong <[hidden email]> 于2020年9月16日周三 下午6:45写道:
>
> > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
> >
> > Congxian Qiu <[hidden email]> 于2020年9月16日周三 下午1:55写道:
> >
> > > Hi
> > >     没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
> > >     如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
> > > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
> > > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
> > >     另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
> > > [2]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
> > > Best,
> > > Congxian
> > >
> > >
> > > hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:
> > >
> > > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> > > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
> > > >
> > > >
> > >
> >
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

nobleyd
还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。

hao kong <[hidden email]> 于2020年9月27日周日 下午5:16写道:

> hi
>     感谢各位,pnowojski@apache.org为我提供了一个FLIP,
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment
> ,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,*
> 从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要自己实现高可用。
>
> 赵一旦 <[hidden email]> 于2020年9月21日周一 下午5:50写道:
>
> > 的确问题没说明白,貌似flink不会存在类似问题。
> >
> > hao kong <[hidden email]> 于2020年9月16日周三 下午6:45写道:
> >
> > > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
> > >
> > > Congxian Qiu <[hidden email]> 于2020年9月16日周三 下午1:55写道:
> > >
> > > > Hi
> > > >     没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
> > > >     如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
> > > > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
> > > > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
> > > >     另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
> > > > [2]
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:
> > > >
> > > > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> > > > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
> > > > >
> > > > >
> > > >
> > >
> >
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

nobleyd
如果说一共n个分区,但是你就是希望使用m<n个并发的kafka数据源算子,也是ok的。因为kafka算子有个参数用于设置从kakfa算子就生成watermark,即不自己使用assignXXX那个api。
kafkaSouce本身会将当前并发实例消费的多个分区的数据分别生成watermark并取小后再发射出去。

赵一旦 <[hidden email]> 于2020年9月27日周日 下午5:51写道:

>
> 还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。
>
> hao kong <[hidden email]> 于2020年9月27日周日 下午5:16写道:
>
>> hi
>>     感谢各位,pnowojski@apache.org为我提供了一个FLIP,
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment
>> ,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,*
>> 从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要自己实现高可用。
>>
>> 赵一旦 <[hidden email]> 于2020年9月21日周一 下午5:50写道:
>>
>> > 的确问题没说明白,貌似flink不会存在类似问题。
>> >
>> > hao kong <[hidden email]> 于2020年9月16日周三 下午6:45写道:
>> >
>> > > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
>> > >
>> > > Congxian Qiu <[hidden email]> 于2020年9月16日周三 下午1:55写道:
>> > >
>> > > > Hi
>> > > >     没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
>> > > >     如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
>> > > > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
>> > > > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
>> > > >     另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
>> > > >
>> > > > [1]
>> > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>> > > > [2]
>> > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>> > > > Best,
>> > > > Congxian
>> > > >
>> > > >
>> > > > hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:
>> > > >
>> > > > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
>> > > > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
>> > > > >
>> > > >
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

hao kong
我目前的情况是从多个kafka
topic获取数据并union到一起进行处理,例如A流的时间是1-100共1w条数据,B流只有时间是1和100的两条数据,由于目前flink
source之间没有*Coordinator*
,两个流的流速在数据量相同的时候是一样的,在union后的timewindow标记watermark里会先拿到A流的1,B流的1,然后拿到A流的1.X,B流的100,这时根据watermark的配置,如果没有设置延迟等待,会丢弃掉A流剩下的9998条数据,如果是多源不union,并行处理的话,不会有这个问题。

赵一旦 <[hidden email]> 于2020年9月27日周日 下午5:53写道:

>
> 如果说一共n个分区,但是你就是希望使用m<n个并发的kafka数据源算子,也是ok的。因为kafka算子有个参数用于设置从kakfa算子就生成watermark,即不自己使用assignXXX那个api。
> kafkaSouce本身会将当前并发实例消费的多个分区的数据分别生成watermark并取小后再发射出去。
>
> 赵一旦 <[hidden email]> 于2020年9月27日周日 下午5:51写道:
>
> >
> >
> 还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。
> >
> > hao kong <[hidden email]> 于2020年9月27日周日 下午5:16写道:
> >
> >> hi
> >>     感谢各位,pnowojski@apache.org为我提供了一个FLIP,
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment
> >>
> ,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,*
> >> 从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要自己实现高可用。
> >>
> >> 赵一旦 <[hidden email]> 于2020年9月21日周一 下午5:50写道:
> >>
> >> > 的确问题没说明白,貌似flink不会存在类似问题。
> >> >
> >> > hao kong <[hidden email]> 于2020年9月16日周三 下午6:45写道:
> >> >
> >> > > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
> >> > >
> >> > > Congxian Qiu <[hidden email]> 于2020年9月16日周三 下午1:55写道:
> >> > >
> >> > > > Hi
> >> > > >     没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
> >> > > >     如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
> >> > > > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
> >> > > > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
> >> > > >     另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
> >> > > >
> >> > > > [1]
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
> >> > > > [2]
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
> >> > > > Best,
> >> > > > Congxian
> >> > > >
> >> > > >
> >> > > > hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:
> >> > > >
> >> > > > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> >> > > > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

nobleyd
说实话,还是不觉得有这种case。KafkaSouceA(1,1,1,2,2,2,.....,100),KafkaSouceB(1,100),然后AB都接到
operatorC 上,operatorC后续跟个窗口算子operatorD。
(1)oepratorC负责调用assignTimestampsAndWatermarks
完成watermark的生成。这么搞会出问题,因为operatorC会更快收到KafkaSouceB的100,进而生成watermark=100(假设maxOutOfOrderness=0)。那么operatorD在收到KafkaSouceA的后续元素会认为迟到丢弃。

(2)如果在KafkaSouceA部分直接生成watermark,KafkaSourceB部分也直接生成watermark,然后算子operatorC部分相当于是watermark的合并取小,对于后续的operatorD也是不会影响的。只有KafkaSouceA和KafkaSouceB的100都到达(此时KafkaSouceA和KafkaSouceB都分别发出了100的watermark),进而对于operatorC的watermark才会推进到100,再然后是operatorD的watermark推进到100。数据是不会丢失的呀。

(3)还有一种更暴力的,如果KafkaSouceA和B都是相同kafka集群,只是topic不同,完全可以合并为一个KafkaSouce,使用regex方式指定topic进行消费。
最后一点,(2)中说的watermark赋值也有2中实现方案。1
是在KafkaSouce后面通过forward分区方式跟一个相同并发的watermarkAssigner即可。 2 是不额外引入算子,而是直接调用
flinkKafkaConsumer.assignTimestampsAndWatermarks实现。


最后,问下你是FlinkSQL场景还是DataStream API,会不会我们场景不同,sql场景可能是没有这么灵活。

hao kong <[hidden email]> 于2020年9月28日周一 上午11:12写道:

> 我目前的情况是从多个kafka
> topic获取数据并union到一起进行处理,例如A流的时间是1-100共1w条数据,B流只有时间是1和100的两条数据,由于目前flink
> source之间没有*Coordinator*
>
> ,两个流的流速在数据量相同的时候是一样的,在union后的timewindow标记watermark里会先拿到A流的1,B流的1,然后拿到A流的1.X,B流的100,这时根据watermark的配置,如果没有设置延迟等待,会丢弃掉A流剩下的9998条数据,如果是多源不union,并行处理的话,不会有这个问题。
>
> 赵一旦 <[hidden email]> 于2020年9月27日周日 下午5:53写道:
>
> >
> >
> 如果说一共n个分区,但是你就是希望使用m<n个并发的kafka数据源算子,也是ok的。因为kafka算子有个参数用于设置从kakfa算子就生成watermark,即不自己使用assignXXX那个api。
> > kafkaSouce本身会将当前并发实例消费的多个分区的数据分别生成watermark并取小后再发射出去。
> >
> > 赵一旦 <[hidden email]> 于2020年9月27日周日 下午5:51写道:
> >
> > >
> > >
> >
> 还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。
> > >
> > > hao kong <[hidden email]> 于2020年9月27日周日 下午5:16写道:
> > >
> > >> hi
> > >>     感谢各位,pnowojski@apache.org为我提供了一个FLIP,
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment
> > >>
> >
> ,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,*
> > >> 从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要自己实现高可用。
> > >>
> > >> 赵一旦 <[hidden email]> 于2020年9月21日周一 下午5:50写道:
> > >>
> > >> > 的确问题没说明白,貌似flink不会存在类似问题。
> > >> >
> > >> > hao kong <[hidden email]> 于2020年9月16日周三 下午6:45写道:
> > >> >
> > >> > > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
> > >> > >
> > >> > > Congxian Qiu <[hidden email]> 于2020年9月16日周三 下午1:55写道:
> > >> > >
> > >> > > > Hi
> > >> > > >     没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
> > >> > > >     如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
> > >> > > > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个
> Flink
> > >> > > > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
> > >> > > >     另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
> > >> > > >
> > >> > > > [1]
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
> > >> > > > [2]
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
> > >> > > > Best,
> > >> > > > Congxian
> > >> > > >
> > >> > > >
> > >> > > > hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:
> > >> > > >
> > >> > > > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> > >> > > > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
> > >> > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

nobleyd
我这边负责的作业,一个作业上有2-3个kafka数据源,还包括多个mysql配置流数据源。也是各种join,但是没有union的case。
没有任何watermark的问题,flink现有机制都是可以完美解决的。

赵一旦 <[hidden email]> 于2020年9月28日周一 上午11:37写道:

> 说实话,还是不觉得有这种case。KafkaSouceA(1,1,1,2,2,2,.....,100),KafkaSouceB(1,100),然后AB都接到
> operatorC 上,operatorC后续跟个窗口算子operatorD。
> (1)oepratorC负责调用assignTimestampsAndWatermarks
> 完成watermark的生成。这么搞会出问题,因为operatorC会更快收到KafkaSouceB的100,进而生成watermark=100(假设maxOutOfOrderness=0)。那么operatorD在收到KafkaSouceA的后续元素会认为迟到丢弃。
>
>
> (2)如果在KafkaSouceA部分直接生成watermark,KafkaSourceB部分也直接生成watermark,然后算子operatorC部分相当于是watermark的合并取小,对于后续的operatorD也是不会影响的。只有KafkaSouceA和KafkaSouceB的100都到达(此时KafkaSouceA和KafkaSouceB都分别发出了100的watermark),进而对于operatorC的watermark才会推进到100,再然后是operatorD的watermark推进到100。数据是不会丢失的呀。
>
>
> (3)还有一种更暴力的,如果KafkaSouceA和B都是相同kafka集群,只是topic不同,完全可以合并为一个KafkaSouce,使用regex方式指定topic进行消费。
> 最后一点,(2)中说的watermark赋值也有2中实现方案。1
> 是在KafkaSouce后面通过forward分区方式跟一个相同并发的watermarkAssigner即可。 2 是不额外引入算子,而是直接调用
> flinkKafkaConsumer.assignTimestampsAndWatermarks实现。
>
>
> 最后,问下你是FlinkSQL场景还是DataStream API,会不会我们场景不同,sql场景可能是没有这么灵活。
>
> hao kong <[hidden email]> 于2020年9月28日周一 上午11:12写道:
>
>> 我目前的情况是从多个kafka
>> topic获取数据并union到一起进行处理,例如A流的时间是1-100共1w条数据,B流只有时间是1和100的两条数据,由于目前flink
>> source之间没有*Coordinator*
>>
>> ,两个流的流速在数据量相同的时候是一样的,在union后的timewindow标记watermark里会先拿到A流的1,B流的1,然后拿到A流的1.X,B流的100,这时根据watermark的配置,如果没有设置延迟等待,会丢弃掉A流剩下的9998条数据,如果是多源不union,并行处理的话,不会有这个问题。
>>
>> 赵一旦 <[hidden email]> 于2020年9月27日周日 下午5:53写道:
>>
>> >
>> >
>> 如果说一共n个分区,但是你就是希望使用m<n个并发的kafka数据源算子,也是ok的。因为kafka算子有个参数用于设置从kakfa算子就生成watermark,即不自己使用assignXXX那个api。
>> > kafkaSouce本身会将当前并发实例消费的多个分区的数据分别生成watermark并取小后再发射出去。
>> >
>> > 赵一旦 <[hidden email]> 于2020年9月27日周日 下午5:51写道:
>> >
>> > >
>> > >
>> >
>> 还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。
>> > >
>> > > hao kong <[hidden email]> 于2020年9月27日周日 下午5:16写道:
>> > >
>> > >> hi
>> > >>     感谢各位,pnowojski@apache.org为我提供了一个FLIP,
>> > >>
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment
>> > >>
>> >
>> ,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,*
>> > >> 从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要自己实现高可用。
>> > >>
>> > >> 赵一旦 <[hidden email]> 于2020年9月21日周一 下午5:50写道:
>> > >>
>> > >> > 的确问题没说明白,貌似flink不会存在类似问题。
>> > >> >
>> > >> > hao kong <[hidden email]> 于2020年9月16日周三 下午6:45写道:
>> > >> >
>> > >> > > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
>> > >> > >
>> > >> > > Congxian Qiu <[hidden email]> 于2020年9月16日周三 下午1:55写道:
>> > >> > >
>> > >> > > > Hi
>> > >> > > >     没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
>> > >> > > >     如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
>> > >> > > > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个
>> Flink
>> > >> > > > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
>> > >> > > >     另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
>> > >> > > >
>> > >> > > > [1]
>> > >> > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>> > >> > > > [2]
>> > >> > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>> > >> > > > Best,
>> > >> > > > Congxian
>> > >> > > >
>> > >> > > >
>> > >> > > > hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:
>> > >> > > >
>> > >> > > > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
>> > >> > > > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
>> > >> > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

hao kong
是DataStream
API,主要是从用户不同行为属性,例如支付,访问等在不同的topic里,union后根据session的窗口聚合,获取到根据userId和sessionId两类型数据,分别是用户全部行为和用户单次访问的全部行为。
处理全实时数据没问题,但是如果处理带有历史数据就会遇到每一个topic内历史数据量不同,之前的解决方案是分为批处理和流处理两步,现在想把这两步合并起来,就会出现watermark被推高,丢掉快流的数据。
其实不光是丢数据的问题,如果按照快流的watermark来设置的话,也会出现,大量数据缓存在state里的情况,所以想做根据时间戳控制source读取速度。
非常感谢你提的三种方法,我比较倾向第三种,通过单个source进行控速比较符合我这的case,每次去读取不同的topic前两条数据,对比时间,确认下一个读取的目标。
Reply | Threaded
Open this post in threaded view
|

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

Shengkai Fang
In reply to this post by nobleyd
hi, 你说的是为每个partition生成一个watermark吗? 这样子快流和慢流都会有独立的watermark gererator。
datastream已经支持了该特性, table层正在支持该特性,你可以看看flink-19282的jira。

赵一旦 <[hidden email]>于2020年9月28日 周一上午11:39写道:

> 我这边负责的作业,一个作业上有2-3个kafka数据源,还包括多个mysql配置流数据源。也是各种join,但是没有union的case。
>
> 没有任何watermark的问题,flink现有机制都是可以完美解决的。
>
>
>
> 赵一旦 <[hidden email]> 于2020年9月28日周一 上午11:37写道:
>
>
>
> >
> 说实话,还是不觉得有这种case。KafkaSouceA(1,1,1,2,2,2,.....,100),KafkaSouceB(1,100),然后AB都接到
>
> > operatorC 上,operatorC后续跟个窗口算子operatorD。
>
> > (1)oepratorC负责调用assignTimestampsAndWatermarks
>
> >
> 完成watermark的生成。这么搞会出问题,因为operatorC会更快收到KafkaSouceB的100,进而生成watermark=100(假设maxOutOfOrderness=0)。那么operatorD在收到KafkaSouceA的后续元素会认为迟到丢弃。
>
> >
>
> >
>
> >
> (2)如果在KafkaSouceA部分直接生成watermark,KafkaSourceB部分也直接生成watermark,然后算子operatorC部分相当于是watermark的合并取小,对于后续的operatorD也是不会影响的。只有KafkaSouceA和KafkaSouceB的100都到达(此时KafkaSouceA和KafkaSouceB都分别发出了100的watermark),进而对于operatorC的watermark才会推进到100,再然后是operatorD的watermark推进到100。数据是不会丢失的呀。
>
> >
>
> >
>
> >
> (3)还有一种更暴力的,如果KafkaSouceA和B都是相同kafka集群,只是topic不同,完全可以合并为一个KafkaSouce,使用regex方式指定topic进行消费。
>
> > 最后一点,(2)中说的watermark赋值也有2中实现方案。1
>
> > 是在KafkaSouce后面通过forward分区方式跟一个相同并发的watermarkAssigner即可。 2 是不额外引入算子,而是直接调用
>
> > flinkKafkaConsumer.assignTimestampsAndWatermarks实现。
>
> >
>
> >
>
> > 最后,问下你是FlinkSQL场景还是DataStream API,会不会我们场景不同,sql场景可能是没有这么灵活。
>
> >
>
> > hao kong <[hidden email]> 于2020年9月28日周一 上午11:12写道:
>
> >
>
> >> 我目前的情况是从多个kafka
>
> >> topic获取数据并union到一起进行处理,例如A流的时间是1-100共1w条数据,B流只有时间是1和100的两条数据,由于目前flink
>
> >> source之间没有*Coordinator*
>
> >>
>
> >>
> ,两个流的流速在数据量相同的时候是一样的,在union后的timewindow标记watermark里会先拿到A流的1,B流的1,然后拿到A流的1.X,B流的100,这时根据watermark的配置,如果没有设置延迟等待,会丢弃掉A流剩下的9998条数据,如果是多源不union,并行处理的话,不会有这个问题。
>
> >>
>
> >> 赵一旦 <[hidden email]> 于2020年9月27日周日 下午5:53写道:
>
> >>
>
> >> >
>
> >> >
>
> >>
> 如果说一共n个分区,但是你就是希望使用m<n个并发的kafka数据源算子,也是ok的。因为kafka算子有个参数用于设置从kakfa算子就生成watermark,即不自己使用assignXXX那个api。
>
> >> > kafkaSouce本身会将当前并发实例消费的多个分区的数据分别生成watermark并取小后再发射出去。
>
> >> >
>
> >> > 赵一旦 <[hidden email]> 于2020年9月27日周日 下午5:51写道:
>
> >> >
>
> >> > >
>
> >> > >
>
> >> >
>
> >>
> 还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。
>
> >> > >
>
> >> > > hao kong <[hidden email]> 于2020年9月27日周日 下午5:16写道:
>
> >> > >
>
> >> > >> hi
>
> >> > >>     感谢各位,pnowojski@apache.org为我提供了一个FLIP,
>
> >> > >>
>
> >> > >>
>
> >> >
>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment
>
> >> > >>
>
> >> >
>
> >>
> ,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,*
>
> >> > >> 从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要自己实现高可用。
>
> >> > >>
>
> >> > >> 赵一旦 <[hidden email]> 于2020年9月21日周一 下午5:50写道:
>
> >> > >>
>
> >> > >> > 的确问题没说明白,貌似flink不会存在类似问题。
>
> >> > >> >
>
> >> > >> > hao kong <[hidden email]> 于2020年9月16日周三 下午6:45写道:
>
> >> > >> >
>
> >> > >> > > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
>
> >> > >> > >
>
> >> > >> > > Congxian Qiu <[hidden email]> 于2020年9月16日周三 下午1:55写道:
>
> >> > >> > >
>
> >> > >> > > > Hi
>
> >> > >> > > >     没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
>
> >> > >> > > >     如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
>
> >> > >> > > > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个
>
> >> Flink
>
> >> > >> > > > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
>
> >> > >> > > >     另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
>
> >> > >> > > >
>
> >> > >> > > > [1]
>
> >> > >> > > >
>
> >> > >> > > >
>
> >> > >> > >
>
> >> > >> >
>
> >> > >>
>
> >> >
>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>
> >> > >> > > > [2]
>
> >> > >> > > >
>
> >> > >> > > >
>
> >> > >> > >
>
> >> > >> >
>
> >> > >>
>
> >> >
>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>
> >> > >> > > > Best,
>
> >> > >> > > > Congxian
>
> >> > >> > > >
>
> >> > >> > > >
>
> >> > >> > > > hao kong <[hidden email]> 于2020年9月16日周三 上午10:24写道:
>
> >> > >> > > >
>
> >> > >> > > > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
>
> >> > >> > > > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
>
> >> > >> > > > >
>
> >> > >> > > > >
>
> >> > >> > > >
>
> >> > >> > >
>
> >> > >> >
>
> >> > >>
>
> >> >
>
> >>
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
>
> >> > >> > > > >
>
> >> > >> > > >
>
> >> > >> > >
>
> >> > >> >
>
> >> > >>
>
> >> > >
>
> >> >
>
> >>
>
> >
>
>