Hello,大家好:
在flink stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 为了实现这个功能,我想有两种方法: 1. 在算子输出后面重新为消息分配水印:看到flink stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 2. 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。 我现在只能使用`assignTimestampsAndWatermarks` 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? 感谢解答! |
在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark
的。另外语义上使用 AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang <[hidden email]> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink > stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 > 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink > stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. > 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。 > > > 我现在只能使用`assignTimestampsAndWatermarks` > 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答! |
In reply to this post by taowang
感谢回复,但是很抱歉我试了一下发现不可以。
无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return null`时下游算子拿到的水印都显示为`No Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。 看了这两个接口文档,不太理解这里的`no new watermark will be generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no watermark`?)。 @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。 感谢帮助!🙏🙏🙏 ``` public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> { /** * Returns the current watermark. This method is periodically called by the * system to retrieve the current watermark. The method may return {@code null} to * indicate that no new Watermark is available. * * <p>The returned watermark will be emitted only if it is non-null and its timestamp * is larger than that of the previously emitted watermark (to preserve the contract of * ascending watermarks). If the current watermark is still * identical to the previous one, no progress in event time has happened since * the previous call to this method. If a null value is returned, or the timestamp * of the returned watermark is smaller than that of the last emitted one, then no * new watermark will be generated. * * <p>The interval in which this method is called and Watermarks are generated * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. * * @see org.apache.flink.streaming.api.watermark.Watermark * @see ExecutionConfig#getAutoWatermarkInterval() * * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. */ @Nullable Watermark getCurrentWatermark(); } ``` 原始邮件 发件人: tison<[hidden email]> 收件人: user-zh<[hidden email]> 发送时间: 2020年4月16日(周四) 20:33 主题: Re: 为消息分配时间戳但不想重新分配水印 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用 AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang <[hidden email]> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink > stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 > 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink > stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. > 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。 > > > 我现在只能使用`assignTimestampsAndWatermarks` > 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答! |
In reply to this post by taowang
喔,看差了...Flink 原生没有这样的支持,不过可以扩展基础的算子来实现你要的逻辑
参考 assignTimestampsAndWatermarks 的实现,以及 TimestampsAndPunctuatedWatermarksOperator,不要重写 processWatermark 方法,应该可以实现。DataStream 方面调用更基础的 transform 方法 如果你觉得这是一个合理的需求,应该开箱即用,也可以到 Flink 的 JIRA(issue)列表上提 https://jira.apache.org/jira/projects/FLINK/issues 注册 id 后创建 JIRA 单即可 Best, tison. taowang <[hidden email]> 于2020年4月16日周四 下午10:12写道: > 感谢回复,但是很抱歉我试了一下发现不可以。 > 无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return > null`时下游算子拿到的水印都显示为`No > Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。 > 看了这两个接口文档,不太理解这里的`no new watermark will be > generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no > watermark`?)。 > @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。 > > > 感谢帮助!🙏🙏🙏 > ``` > public interface AssignerWithPeriodicWatermarks<T> extends > TimestampAssigner<T> { > > /** > * Returns the current watermark. This method is periodically called by the > * system to retrieve the current watermark. The method may return {@code > null} to > * indicate that no new Watermark is available. > * > * <p>The returned watermark will be emitted only if it is non-null and > its timestamp > * is larger than that of the previously emitted watermark (to preserve > the contract of > * ascending watermarks). If the current watermark is still > * identical to the previous one, no progress in event time has happened > since > * the previous call to this method. If a null value is returned, or the > timestamp > * of the returned watermark is smaller than that of the last emitted one, > then no > * new watermark will be generated. > * > * <p>The interval in which this method is called and Watermarks are > generated > * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. > * > * @see org.apache.flink.streaming.api.watermark.Watermark > * @see ExecutionConfig#getAutoWatermarkInterval() > * > * @return {@code Null}, if no watermark should be emitted, or the next > watermark to emit. > */ > @Nullable > Watermark getCurrentWatermark(); > } > ``` > > > 原始邮件 > 发件人: tison<[hidden email]> > 收件人: user-zh<[hidden email]> > 发送时间: 2020年4月16日(周四) 20:33 > 主题: Re: 为消息分配时间戳但不想重新分配水印 > > > 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用 > AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang < > [hidden email]> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink > > stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 > > 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink > > stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark > > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. > > 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。 > > > > 我现在只能使用`assignTimestampsAndWatermarks` > > 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答! |
从语义上说,已经有产生 Watermark 的逻辑了,如果 forward 此前的 watermark
在其他一些用户场景下或许也不合适。从另一个角度考虑你也可以把 watermark 带在 element 上,实现 AssignerWithPunctuatedWatermarks 的 Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp); 方法时从 element 取出来 Best, tison. tison <[hidden email]> 于2020年4月16日周四 下午10:36写道: > 喔,看差了...Flink 原生没有这样的支持,不过可以扩展基础的算子来实现你要的逻辑 > > 参考 assignTimestampsAndWatermarks > 的实现,以及 TimestampsAndPunctuatedWatermarksOperator,不要重写 processWatermark > 方法,应该可以实现。DataStream 方面调用更基础的 transform 方法 > > 如果你觉得这是一个合理的需求,应该开箱即用,也可以到 Flink 的 JIRA(issue)列表上提 > https://jira.apache.org/jira/projects/FLINK/issues 注册 id 后创建 JIRA 单即可 > > Best, > tison. > > > taowang <[hidden email]> 于2020年4月16日周四 下午10:12写道: > >> 感谢回复,但是很抱歉我试了一下发现不可以。 >> 无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return >> null`时下游算子拿到的水印都显示为`No >> Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。 >> 看了这两个接口文档,不太理解这里的`no new watermark will be >> generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no >> watermark`?)。 >> @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。 >> >> >> 感谢帮助!🙏🙏🙏 >> ``` >> public interface AssignerWithPeriodicWatermarks<T> extends >> TimestampAssigner<T> { >> >> /** >> * Returns the current watermark. This method is periodically called by >> the >> * system to retrieve the current watermark. The method may return {@code >> null} to >> * indicate that no new Watermark is available. >> * >> * <p>The returned watermark will be emitted only if it is non-null and >> its timestamp >> * is larger than that of the previously emitted watermark (to preserve >> the contract of >> * ascending watermarks). If the current watermark is still >> * identical to the previous one, no progress in event time has happened >> since >> * the previous call to this method. If a null value is returned, or the >> timestamp >> * of the returned watermark is smaller than that of the last emitted >> one, then no >> * new watermark will be generated. >> * >> * <p>The interval in which this method is called and Watermarks are >> generated >> * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. >> * >> * @see org.apache.flink.streaming.api.watermark.Watermark >> * @see ExecutionConfig#getAutoWatermarkInterval() >> * >> * @return {@code Null}, if no watermark should be emitted, or the next >> watermark to emit. >> */ >> @Nullable >> Watermark getCurrentWatermark(); >> } >> ``` >> >> >> 原始邮件 >> 发件人: tison<[hidden email]> >> 收件人: user-zh<[hidden email]> >> 发送时间: 2020年4月16日(周四) 20:33 >> 主题: Re: 为消息分配时间戳但不想重新分配水印 >> >> >> 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用 >> AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang < >> [hidden email]> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink > >> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 > >> 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink > >> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark >> > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. > >> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。 >> > > > 我现在只能使用`assignTimestampsAndWatermarks` > >> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答! > > |
In reply to this post by taowang
嗯嗯,还是十分感谢。
那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 打扰各位了,祝好!~ 原始邮件 发件人: tison<[hidden email]> 收件人: user-zh<[hidden email]> 发送时间: 2020年4月16日(周四) 22:39 主题: Re: 为消息分配时间戳但不想重新分配水印 正在载入邮件原文… |
In reply to this post by taowang
请问,你对DataStream重新声明时间列和水印,生效吗?
taowang <[hidden email]> 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 > 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ > > > 原始邮件 > 发件人: tison<[hidden email]> > 收件人: user-zh<[hidden email]> > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re: 为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文… |
In reply to this post by taowang
是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。
原始邮件 发件人: lec ssmi<[hidden email]> 收件人: flink-user-cn<[hidden email]> 发送时间: 2020年4月17日(周五) 09:25 主题: Re: 为消息分配时间戳但不想重新分配水印 请问,你对DataStream重新声明时间列和水印,生效吗? taowang <[hidden email]> 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 > 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ > > > 原始邮件 > 发件人: tison<[hidden email]> > 收件人: user-zh<[hidden email]> > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re: 为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文… |
In reply to this post by taowang
watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的?
taowang <[hidden email]> 于2020年4月17日周五 上午10:46写道: > 是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。 > > > 原始邮件 > 发件人: lec ssmi<[hidden email]> > 收件人: flink-user-cn<[hidden email]> > 发送时间: 2020年4月17日(周五) 09:25 > 主题: Re: 为消息分配时间戳但不想重新分配水印 > > > 请问,你对DataStream重新声明时间列和水印,生效吗? taowang <[hidden email]> > 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 > > 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ > > > > 原始邮件 > 发件人: tison<[hidden email]> > 收件人: user-zh< > [hidden email]> > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re: > 为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文… |
In reply to this post by taowang
我的测试结果时,把原先的丢弃掉,完全采用最新的逻辑。
原始邮件 发件人: lec ssmi<[hidden email]> 收件人: flink-user-cn<[hidden email]> 发送时间: 2020年4月17日(周五) 14:43 主题: Re: 为消息分配时间戳但不想重新分配水印 watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的? taowang <[hidden email]> 于2020年4月17日周五 上午10:46写道: > 是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。 > > > 原始邮件 > 发件人: lec ssmi<[hidden email]> > 收件人: flink-user-cn<[hidden email]> > 发送时间: 2020年4月17日(周五) 09:25 > 主题: Re: 为消息分配时间戳但不想重新分配水印 > > > 请问,你对DataStream重新声明时间列和水印,生效吗? taowang <[hidden email]> > 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 > > 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ > > > > 原始邮件 > 发件人: tison<[hidden email]> > 收件人: user-zh< > [hidden email]> > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re: > 为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文… |
Free forum by Nabble | Edit this page |