对flink源码中watermark对齐逻辑的疑惑

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

对flink源码中watermark对齐逻辑的疑惑

Cayden chen
大家好,最近在看watermark传递的源码解析的时候,对watermark对齐逻辑有一些疑惑。代码如下

public void inputWatermark(Watermark watermark, int channelIndex) {
                // ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).
                if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
                        long watermarkMillis = watermark.getTimestamp();


                        // if the input watermark's value is less than the last received watermark for its input channel, ignore it also.
                        if (watermarkMillis > channelStatuses[channelIndex].watermark) {
                                channelStatuses[channelIndex].watermark = watermarkMillis;


                                // previously unaligned input channels are now aligned if its watermark has caught up
                                if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
                                        channelStatuses[channelIndex].isWatermarkAligned = true;
                                }


                                // now, attempt to find a new min watermark across all aligned channels
                                findAndOutputNewMinWatermarkAcrossAlignedChannels();
                        }
                }
        }



private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
                long newMinWatermark = Long.MAX_VALUE;
                boolean hasAlignedChannels = false;


                // determine new overall watermark by considering only watermark-aligned channels across all channels
                for (InputChannelStatus channelStatus : channelStatuses) {
                        if (channelStatus.isWatermarkAligned) {
                                hasAlignedChannels = true;
                                newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
                        }
                }


                // we acknowledge and output the new overall watermark if it really is aggregated
                // from some remaining aligned channel, and is also larger than the last output watermark
                if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
                        lastOutputWatermark = newMinWatermark;
                        outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
                }
        }



这段代码中好像并没有多个  channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同 channelIndex 到来的watermark做一个取最小值的逻辑吗?
Reply | Threaded
Open this post in threaded view
|

Re: 对flink源码中watermark对齐逻辑的疑惑

Benchao Li
Hi,
我觉得你理解的是正确的。watermark就是取各个input channel的最小值作为当前subtask的watermark的。

1193216154 <[hidden email]> 于2020年5月11日周一 下午3:17写道:

> 大家好,最近在看watermark传递的源码解析的时候,对watermark对齐逻辑有一些疑惑。代码如下
>
> public void inputWatermark(Watermark watermark, int channelIndex) {
>                 // ignore the input watermark if its input channel, or all
> input channels are idle (i.e. overall the valve is idle).
>                 if (lastOutputStreamStatus.isActive() &amp;&amp;
> channelStatuses[channelIndex].streamStatus.isActive()) {
>                         long watermarkMillis = watermark.getTimestamp();
>
>
>                         // if the input watermark's value is less than the
> last received watermark for its input channel, ignore it also.
>                         if (watermarkMillis &gt;
> channelStatuses[channelIndex].watermark) {
>                                 channelStatuses[channelIndex].watermark =
> watermarkMillis;
>
>
>                                 // previously unaligned input channels are
> now aligned if its watermark has caught up
>                                 if
> (!channelStatuses[channelIndex].isWatermarkAligned &amp;&amp;
> watermarkMillis &gt;= lastOutputWatermark) {
>
> channelStatuses[channelIndex].isWatermarkAligned = true;
>                                 }
>
>
>                                 // now, attempt to find a new min
> watermark across all aligned channels
>
> findAndOutputNewMinWatermarkAcrossAlignedChannels();
>                         }
>                 }
>         }
>
>
>
> private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
>                 long newMinWatermark = Long.MAX_VALUE;
>                 boolean hasAlignedChannels = false;
>
>
>                 // determine new overall watermark by considering only
> watermark-aligned channels across all channels
>                 for (InputChannelStatus channelStatus : channelStatuses) {
>                         if (channelStatus.isWatermarkAligned) {
>                                 hasAlignedChannels = true;
>                                 newMinWatermark =
> Math.min(channelStatus.watermark, newMinWatermark);
>                         }
>                 }
>
>
>                 // we acknowledge and output the new overall watermark if
> it really is aggregated
>                 // from some remaining aligned channel, and is also larger
> than the last output watermark
>                 if (hasAlignedChannels &amp;&amp; newMinWatermark &gt;
> lastOutputWatermark) {
>                         lastOutputWatermark = newMinWatermark;
>                         outputHandler.handleWatermark(new
> Watermark(lastOutputWatermark));
>                 }
>         }
>
>
>
> 这段代码中好像并没有多个&nbsp; channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同
> channelIndex 到来的watermark做一个取最小值的逻辑吗?



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 对flink源码中watermark对齐逻辑的疑惑

Yun Tang
Hi

正是因为取各个input channel的最小值,所以如果某一个上游一直没有获取到真实数据,发送下来的watermark一直都是Long.MIN_VALUE,这样会导致无法触发window,社区采用idle source [1]的方式walk around该问题

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources

祝好
唐云
________________________________
From: Benchao Li <[hidden email]>
Sent: Monday, May 11, 2020 16:12
To: user-zh <[hidden email]>
Subject: Re: 对flink源码中watermark对齐逻辑的疑惑

Hi,
我觉得你理解的是正确的。watermark就是取各个input channel的最小值作为当前subtask的watermark的。

1193216154 <[hidden email]> 于2020年5月11日周一 下午3:17写道:

> 大家好,最近在看watermark传递的源码解析的时候,对watermark对齐逻辑有一些疑惑。代码如下
>
> public void inputWatermark(Watermark watermark, int channelIndex) {
>                 // ignore the input watermark if its input channel, or all
> input channels are idle (i.e. overall the valve is idle).
>                 if (lastOutputStreamStatus.isActive() &amp;&amp;
> channelStatuses[channelIndex].streamStatus.isActive()) {
>                         long watermarkMillis = watermark.getTimestamp();
>
>
>                         // if the input watermark's value is less than the
> last received watermark for its input channel, ignore it also.
>                         if (watermarkMillis &gt;
> channelStatuses[channelIndex].watermark) {
>                                 channelStatuses[channelIndex].watermark =
> watermarkMillis;
>
>
>                                 // previously unaligned input channels are
> now aligned if its watermark has caught up
>                                 if
> (!channelStatuses[channelIndex].isWatermarkAligned &amp;&amp;
> watermarkMillis &gt;= lastOutputWatermark) {
>
> channelStatuses[channelIndex].isWatermarkAligned = true;
>                                 }
>
>
>                                 // now, attempt to find a new min
> watermark across all aligned channels
>
> findAndOutputNewMinWatermarkAcrossAlignedChannels();
>                         }
>                 }
>         }
>
>
>
> private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
>                 long newMinWatermark = Long.MAX_VALUE;
>                 boolean hasAlignedChannels = false;
>
>
>                 // determine new overall watermark by considering only
> watermark-aligned channels across all channels
>                 for (InputChannelStatus channelStatus : channelStatuses) {
>                         if (channelStatus.isWatermarkAligned) {
>                                 hasAlignedChannels = true;
>                                 newMinWatermark =
> Math.min(channelStatus.watermark, newMinWatermark);
>                         }
>                 }
>
>
>                 // we acknowledge and output the new overall watermark if
> it really is aggregated
>                 // from some remaining aligned channel, and is also larger
> than the last output watermark
>                 if (hasAlignedChannels &amp;&amp; newMinWatermark &gt;
> lastOutputWatermark) {
>                         lastOutputWatermark = newMinWatermark;
>                         outputHandler.handleWatermark(new
> Watermark(lastOutputWatermark));
>                 }
>         }
>
>
>
> 这段代码中好像并没有多个&nbsp; channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同
> channelIndex 到来的watermark做一个取最小值的逻辑吗?



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]