大家好,最近在看watermark传递的源码解析的时候,对watermark对齐逻辑有一些疑惑。代码如下
public void inputWatermark(Watermark watermark, int channelIndex) { // ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle). if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) { long watermarkMillis = watermark.getTimestamp(); // if the input watermark's value is less than the last received watermark for its input channel, ignore it also. if (watermarkMillis > channelStatuses[channelIndex].watermark) { channelStatuses[channelIndex].watermark = watermarkMillis; // previously unaligned input channels are now aligned if its watermark has caught up if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) { channelStatuses[channelIndex].isWatermarkAligned = true; } // now, attempt to find a new min watermark across all aligned channels findAndOutputNewMinWatermarkAcrossAlignedChannels(); } } } private void findAndOutputNewMinWatermarkAcrossAlignedChannels() { long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // determine new overall watermark by considering only watermark-aligned channels across all channels for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // we acknowledge and output the new overall watermark if it really is aggregated // from some remaining aligned channel, and is also larger than the last output watermark if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; outputHandler.handleWatermark(new Watermark(lastOutputWatermark)); } } 这段代码中好像并没有多个 channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同 channelIndex 到来的watermark做一个取最小值的逻辑吗? |
Hi,
我觉得你理解的是正确的。watermark就是取各个input channel的最小值作为当前subtask的watermark的。 1193216154 <[hidden email]> 于2020年5月11日周一 下午3:17写道: > 大家好,最近在看watermark传递的源码解析的时候,对watermark对齐逻辑有一些疑惑。代码如下 > > public void inputWatermark(Watermark watermark, int channelIndex) { > // ignore the input watermark if its input channel, or all > input channels are idle (i.e. overall the valve is idle). > if (lastOutputStreamStatus.isActive() && > channelStatuses[channelIndex].streamStatus.isActive()) { > long watermarkMillis = watermark.getTimestamp(); > > > // if the input watermark's value is less than the > last received watermark for its input channel, ignore it also. > if (watermarkMillis > > channelStatuses[channelIndex].watermark) { > channelStatuses[channelIndex].watermark = > watermarkMillis; > > > // previously unaligned input channels are > now aligned if its watermark has caught up > if > (!channelStatuses[channelIndex].isWatermarkAligned && > watermarkMillis >= lastOutputWatermark) { > > channelStatuses[channelIndex].isWatermarkAligned = true; > } > > > // now, attempt to find a new min > watermark across all aligned channels > > findAndOutputNewMinWatermarkAcrossAlignedChannels(); > } > } > } > > > > private void findAndOutputNewMinWatermarkAcrossAlignedChannels() { > long newMinWatermark = Long.MAX_VALUE; > boolean hasAlignedChannels = false; > > > // determine new overall watermark by considering only > watermark-aligned channels across all channels > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > hasAlignedChannels = true; > newMinWatermark = > Math.min(channelStatus.watermark, newMinWatermark); > } > } > > > // we acknowledge and output the new overall watermark if > it really is aggregated > // from some remaining aligned channel, and is also larger > than the last output watermark > if (hasAlignedChannels && newMinWatermark > > lastOutputWatermark) { > lastOutputWatermark = newMinWatermark; > outputHandler.handleWatermark(new > Watermark(lastOutputWatermark)); > } > } > > > > 这段代码中好像并没有多个 channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同 > channelIndex 到来的watermark做一个取最小值的逻辑吗? -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi
正是因为取各个input channel的最小值,所以如果某一个上游一直没有获取到真实数据,发送下来的watermark一直都是Long.MIN_VALUE,这样会导致无法触发window,社区采用idle source [1]的方式walk around该问题 [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources 祝好 唐云 ________________________________ From: Benchao Li <[hidden email]> Sent: Monday, May 11, 2020 16:12 To: user-zh <[hidden email]> Subject: Re: 对flink源码中watermark对齐逻辑的疑惑 Hi, 我觉得你理解的是正确的。watermark就是取各个input channel的最小值作为当前subtask的watermark的。 1193216154 <[hidden email]> 于2020年5月11日周一 下午3:17写道: > 大家好,最近在看watermark传递的源码解析的时候,对watermark对齐逻辑有一些疑惑。代码如下 > > public void inputWatermark(Watermark watermark, int channelIndex) { > // ignore the input watermark if its input channel, or all > input channels are idle (i.e. overall the valve is idle). > if (lastOutputStreamStatus.isActive() && > channelStatuses[channelIndex].streamStatus.isActive()) { > long watermarkMillis = watermark.getTimestamp(); > > > // if the input watermark's value is less than the > last received watermark for its input channel, ignore it also. > if (watermarkMillis > > channelStatuses[channelIndex].watermark) { > channelStatuses[channelIndex].watermark = > watermarkMillis; > > > // previously unaligned input channels are > now aligned if its watermark has caught up > if > (!channelStatuses[channelIndex].isWatermarkAligned && > watermarkMillis >= lastOutputWatermark) { > > channelStatuses[channelIndex].isWatermarkAligned = true; > } > > > // now, attempt to find a new min > watermark across all aligned channels > > findAndOutputNewMinWatermarkAcrossAlignedChannels(); > } > } > } > > > > private void findAndOutputNewMinWatermarkAcrossAlignedChannels() { > long newMinWatermark = Long.MAX_VALUE; > boolean hasAlignedChannels = false; > > > // determine new overall watermark by considering only > watermark-aligned channels across all channels > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > hasAlignedChannels = true; > newMinWatermark = > Math.min(channelStatus.watermark, newMinWatermark); > } > } > > > // we acknowledge and output the new overall watermark if > it really is aggregated > // from some remaining aligned channel, and is also larger > than the last output watermark > if (hasAlignedChannels && newMinWatermark > > lastOutputWatermark) { > lastOutputWatermark = newMinWatermark; > outputHandler.handleWatermark(new > Watermark(lastOutputWatermark)); > } > } > > > > 这段代码中好像并没有多个 channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同 > channelIndex 到来的watermark做一个取最小值的逻辑吗? -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Free forum by Nabble | Edit this page |