小伙伴们:
大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 Thanks&Regards |
Hi,
我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。 郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道: > 小伙伴们: > > 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 > > Thanks&Regards -- Best, Benchao Li |
In reply to this post by 郑斌斌
一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 个人推荐先用null存储,后期etl补录。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi, all 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? Best Leonard > 在 2020年8月27日,10:39,china_tao <[hidden email]> 写道: > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 > 个人推荐先用null存储,后期etl补录。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
@Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。
On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]> wrote: > > Hi, all > > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? > > Best > Leonard > > > 在 2020年8月27日,10:39,china_tao <[hidden email]> 写道: > > > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left > > > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 > > 个人推荐先用null存储,后期etl补录。 > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > |
多谢 Jark 提议
Issue[1] 建好了, 大家可以在issue下讨论。 祝好 Leonard [1] https://issues.apache.org/jira/browse/FLINK-19063 <https://issues.apache.org/jira/browse/FLINK-19063> > 在 2020年8月27日,19:54,Jark Wu <[hidden email]> 写道: > > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。 > > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email] <mailto:[hidden email]>> wrote: > > Hi, all > > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? > > Best > Leonard > > > 在 2020年8月27日,10:39,china_tao <[hidden email] <mailto:[hidden email]>> 写道: > > > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left > > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 > > 个人推荐先用null存储,后期etl补录。 > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ <http://apache-flink.147419.n8.nabble.com/> > |
In reply to this post by 郑斌斌
各位好:
现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案: 1.直接将维表一次性加载到内存进行join; 2.使用mysql或者hbase外部存储每条数据进行查询join; 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度 请问各位有什么更好的建议嘛?感谢 原始邮件 发件人: Leonard Xu<[hidden email]> 收件人: Jark Wu<[hidden email]> 抄送: user-zh<[hidden email]>; Benchao Li<[hidden email]> 发送时间: 2020年8月27日(周四) 20:11 主题: Re: Flink 维表延迟join 多谢 Jark 提议 Issue[1] 建好了, 大家可以在issue下讨论。 祝好 Leonard [1] https://issues.apache.org/jira/browse/FLINK-19063 <https://issues.apache.org/jira/browse/FLINK-19063> > 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]>> 写道: > > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。 > > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto:[hidden email]> <mailto:[hidden email]<mailto:[hidden email]>>> wrote: > > Hi, all > > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? > > Best > Leonard > > > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto:[hidden email]> <mailto:[hidden email]<mailto:[hidden email]>>> 写道: > > > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left > > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 > > 个人推荐先用null存储,后期etl补录。 > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ <http://apache-flink.147419.n8.nabble.com/> > |
这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可,
这样可以用到批里面一些特有的join优化。 魏烽 <[hidden email]> 于2020年8月28日周五 上午9:58写道: > 各位好: > > > 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案: > > 1.直接将维表一次性加载到内存进行join; > > 2.使用mysql或者hbase外部存储每条数据进行查询join; > > 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度 > > 请问各位有什么更好的建议嘛?感谢 > > 原始邮件 > 发件人: Leonard Xu<[hidden email]> > 收件人: Jark Wu<[hidden email]> > 抄送: user-zh<[hidden email]>; Benchao Li<[hidden email]> > 发送时间: 2020年8月27日(周四) 20:11 > 主题: Re: Flink 维表延迟join > > > 多谢 Jark 提议 > > Issue[1] 建好了, 大家可以在issue下讨论。 > > 祝好 > Leonard > [1] https://issues.apache.org/jira/browse/FLINK-19063 < > https://issues.apache.org/jira/browse/FLINK-19063> > > > > 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]>> > 写道: > > > > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。 > > > > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto: > [hidden email]> <mailto:[hidden email]<mailto:[hidden email]>>> > wrote: > > > > Hi, all > > > > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, > > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? > > > > Best > > Leonard > > > > > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto: > [hidden email]> <mailto:[hidden email]<mailto: > [hidden email]>>> 写道: > > > > > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left > > > > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 > > > 个人推荐先用null存储,后期etl补录。 > > > > > > > > > > > > -- > > > Sent from: http://apache-flink.147419.n8.nabble.com/ < > http://apache-flink.147419.n8.nabble.com/> > > > > > > -- Best, Benchao Li |
Administrator
|
我们有一个 issue 就是去支持维表 join 文件系统的(包括静态的,和缓慢变化的)。
https://issues.apache.org/jira/browse/FLINK-17397 一种思路是复用现有的 filesystem scan 的能力,temporal join 算子将 filesystem 的数据放在 state 里,然后主流数据去 lookup state。 Best, Jark On Fri, 28 Aug 2020 at 10:28, Benchao Li <[hidden email]> wrote: > 这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可, > 这样可以用到批里面一些特有的join优化。 > > 魏烽 <[hidden email]> 于2020年8月28日周五 上午9:58写道: > >> 各位好: >> >> >> 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案: >> >> 1.直接将维表一次性加载到内存进行join; >> >> 2.使用mysql或者hbase外部存储每条数据进行查询join; >> >> 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度 >> >> 请问各位有什么更好的建议嘛?感谢 >> >> 原始邮件 >> 发件人: Leonard Xu<[hidden email]> >> 收件人: Jark Wu<[hidden email]> >> 抄送: user-zh<[hidden email]>; Benchao Li<[hidden email]> >> 发送时间: 2020年8月27日(周四) 20:11 >> 主题: Re: Flink 维表延迟join >> >> >> 多谢 Jark 提议 >> >> Issue[1] 建好了, 大家可以在issue下讨论。 >> >> 祝好 >> Leonard >> [1] https://issues.apache.org/jira/browse/FLINK-19063 < >> https://issues.apache.org/jira/browse/FLINK-19063> >> >> >> > 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]>> >> 写道: >> > >> > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。 >> > >> > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto: >> [hidden email]> <mailto:[hidden email]<mailto:[hidden email]>>> >> wrote: >> > >> > Hi, all >> > >> > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, >> > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? >> > >> > Best >> > Leonard >> > >> > > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto: >> [hidden email]> <mailto:[hidden email]<mailto: >> [hidden email]>>> 写道: >> > > >> > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left >> > > >> join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 >> > > 个人推荐先用null存储,后期etl补录。 >> > > >> > > >> > > >> > > -- >> > > Sent from: http://apache-flink.147419.n8.nabble.com/ < >> http://apache-flink.147419.n8.nabble.com/> >> > >> >> >> >> > > -- > > Best, > Benchao Li > |
In reply to this post by 郑斌斌
您好,感谢回复:
意思是原始数据用批的方式,然后维表也用批的方式读取进行,进行两个流的join嘛 但是为了程序的扩展性,原始数据只能按照流的方式来读,因为会有其它的需要流的场景 原始邮件 发件人: Benchao Li<[hidden email]> 收件人: user-zh<[hidden email]> 抄送: Jark Wu<[hidden email]> 发送时间: 2020年8月28日(周五) 10:28 主题: Re: Flink 维表延迟join 这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可, 这样可以用到批里面一些特有的join优化。 魏烽 <[hidden email]<mailto:[hidden email]>> 于2020年8月28日周五 上午9:58写道: > 各位好: > > > 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案: > > 1.直接将维表一次性加载到内存进行join; > > 2.使用mysql或者hbase外部存储每条数据进行查询join; > > 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度 > > 请问各位有什么更好的建议嘛?感谢 > > 原始邮件 > 发件人: Leonard Xu<[hidden email]<mailto:[hidden email]>> > 收件人: Jark Wu<[hidden email]<mailto:[hidden email]>> > 抄送: user-zh<[hidden email]<mailto:[hidden email]>>; Benchao Li<[hidden email]<mailto:[hidden email]>> > 发送时间: 2020年8月27日(周四) 20:11 > 主题: Re: Flink 维表延迟join > > > 多谢 Jark 提议 > > Issue[1] 建好了, 大家可以在issue下讨论。 > > 祝好 > Leonard > [1] https://issues.apache.org/jira/browse/FLINK-19063 < > https://issues.apache.org/jira/browse/FLINK-19063> > > > > 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]><mailto:[hidden email]<mailto:[hidden email]>>> > 写道: > > > > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。 > > > > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto:[hidden email]><mailto: > [hidden email]<mailto:[hidden email]>> <mailto:[hidden email]<mailto:[hidden email]><mailto:[hidden email]<mailto:[hidden email]>>>> > wrote: > > > > Hi, all > > > > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, > > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? > > > > Best > > Leonard > > > > > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto:[hidden email]><mailto: > [hidden email]<mailto:[hidden email]>> <mailto:[hidden email]<mailto:[hidden email]><mailto: > [hidden email]<mailto:[hidden email]>>>> 写道: > > > > > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left > > > > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 > > > 个人推荐先用null存储,后期etl补录。 > > > > > > > > > > > > -- > > > Sent from: http://apache-flink.147419.n8.nabble.com/ < > http://apache-flink.147419.n8.nabble.com/> > > > > > > -- Best, Benchao Li |
采用自定义Source读取维表,做成流,双流join,在KeyedCoProcessFunction中将维表维护在state里面
[hidden email] 发件人: 魏烽 发送时间: 2020-08-28 10:49 收件人: user-zh 抄送: Jark Wu 主题: Re: Flink 维表延迟join 您好,感谢回复: 意思是原始数据用批的方式,然后维表也用批的方式读取进行,进行两个流的join嘛 但是为了程序的扩展性,原始数据只能按照流的方式来读,因为会有其它的需要流的场景 原始邮件 发件人: Benchao Li<[hidden email]> 收件人: user-zh<[hidden email]> 抄送: Jark Wu<[hidden email]> 发送时间: 2020年8月28日(周五) 10:28 主题: Re: Flink 维表延迟join 这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可, 这样可以用到批里面一些特有的join优化。 魏烽 <[hidden email]<mailto:[hidden email]>> 于2020年8月28日周五 上午9:58写道: > 各位好: > > > 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案: > > 1.直接将维表一次性加载到内存进行join; > > 2.使用mysql或者hbase外部存储每条数据进行查询join; > > 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度 > > 请问各位有什么更好的建议嘛?感谢 > > 原始邮件 > 发件人: Leonard Xu<[hidden email]<mailto:[hidden email]>> > 收件人: Jark Wu<[hidden email]<mailto:[hidden email]>> > 抄送: user-zh<[hidden email]<mailto:[hidden email]>>; Benchao Li<[hidden email]<mailto:[hidden email]>> > 发送时间: 2020年8月27日(周四) 20:11 > 主题: Re: Flink 维表延迟join > > > 多谢 Jark 提议 > > Issue[1] 建好了, 大家可以在issue下讨论。 > > 祝好 > Leonard > [1] https://issues.apache.org/jira/browse/FLINK-19063 < > https://issues.apache.org/jira/browse/FLINK-19063> > > > > 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]><mailto:[hidden email]<mailto:[hidden email]>>> > 写道: > > > > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。 > > > > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto:[hidden email]><mailto: > [hidden email]<mailto:[hidden email]>> <mailto:[hidden email]<mailto:[hidden email]><mailto:[hidden email]<mailto:[hidden email]>>>> > wrote: > > > > Hi, all > > > > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, > > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? > > > > Best > > Leonard > > > > > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto:[hidden email]><mailto: > [hidden email]<mailto:[hidden email]>> <mailto:[hidden email]<mailto:[hidden email]><mailto: > [hidden email]<mailto:[hidden email]>>>> 写道: > > > > > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left > > > > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 > > > 个人推荐先用null存储,后期etl补录。 > > > > > > > > > > > > -- > > > Sent from: http://apache-flink.147419.n8.nabble.com/ < > http://apache-flink.147419.n8.nabble.com/> > > > > > > -- Best, Benchao Li |
In reply to this post by Benchao Li-2
Hi,
Benchao,这种发送到另外一个 topic ,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。 Benchao Li <[hidden email]> 于2020年8月27日周四 上午10:08写道: > Hi, > > 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。 > > 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。 > > 郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道: > > > 小伙伴们: > > > > > 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 > > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 > > > > Thanks&Regards > > > > -- > > Best, > Benchao Li > |
In reply to this post by 郑斌斌
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们这边业务方也有维表延迟关联的述求,比如 HBase 维表的数据关联。 当前的场景是,有一张实时维表,消费 mysql binlog,然后业务方 etl 后,输出到 HBase。然后业务方还有另外一个流,会去关联这张维表,由于存在某些 rowkey 的数据还没有写入到 hbase,而另外一条流就去关联 HBase,却没有数据。所以业务方希望有个延迟维表关联功能,比如 10 分钟后在进行关联,目前我们考虑借助 Timer 来实现的, 社区如果有这个功能的话,我觉得对于 Flink 使用方会有很大帮助的。我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。 [1] https://issues.apache.org/jira/browse/FLINK-19063 Best, LakeShen 郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道: > 小伙伴们: > > 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 > > Thanks&Regards |
In reply to this post by LakeShen
我们生产环境也遇到了同样的问题,除了benchao说的用算子来做延迟join外。可以使用rocketmq的延迟发送功能来存放维度拼接失败的消息。然后flink再同时消费kafka + rocket mq的数据。
我建议在生产环境使用,因为有的时候flink侧很难判断到底哪种情况是拼接失败,是异常呢,还是空值呢,还是返回结果不符合要求。我觉得在用户层面做会比较灵活。单独的容灾队列对于下游的处理也提供了更多的方式。 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年2月25日(星期四) 中午12:00 收件人: "user-zh"<[hidden email]>; 主题: Re: Flink 维表延迟join Hi, Benchao,这种发送到另外一个 topic ,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。 Benchao Li <[hidden email]> 于2020年8月27日周四 上午10:08写道: > Hi, > > 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。 > > 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。 > > 郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道: > > > 小伙伴们: > > > > > 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 > > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 > > > > Thanks&Regards > > > > -- > > Best, > Benchao Li > |
In reply to this post by 郑斌斌
我们也有遇到维度关联的时候维表比流晚到的情况,不过我们的流一般都有唯一键,因此目前是用 session window 来自定义控制流的落后时间来关联。
具体思路: 对于流,按唯一键 Group By,对 process time 开延迟时间长度的 session window。 因为 Group By 了唯一键,每个 session 窗口里面一定只有一条数据,所以一定是到了设定的 size 就会触发的。 比如期望流数据到达后一分钟再去关联维表,SQL 如下: -- 原始 SQL SELECT s.id, s.another_field, r.dim_id_value FROM source_table AS s JOIN dim_table for system_time AS of s.proctime AS r ON r.id = s.id -- 修改后的延迟1分钟进行关联的 SQL SELECT s.id, s.another_field, r.dim_id_value FROM ( SELECT id, MAX(another_field) AS another_field, SESSION_PROCTIME(proctime, INTERVAL '1' MINUTE)AS proctime FROM source_table GROUP BY SESSION(proctime, INTERVAL '1' MINUTE), id ) AS s JOIN dim_table for system_time AS of s.proctime AS r ON r.id = s.id -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Suhan
我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL 维表延迟Join的问题了吗? 有解决方案的小伙伴能分享下嘛? | | JasonLee | | [hidden email] | 签名由网易邮箱大师定制 在2021年02月25日 14:40,Suhan<[hidden email]> 写道: 我们生产环境也遇到了同样的问题,除了benchao说的用算子来做延迟join外。可以使用rocketmq的延迟发送功能来存放维度拼接失败的消息。然后flink再同时消费kafka + rocket mq的数据。 我建议在生产环境使用,因为有的时候flink侧很难判断到底哪种情况是拼接失败,是异常呢,还是空值呢,还是返回结果不符合要求。我觉得在用户层面做会比较灵活。单独的容灾队列对于下游的处理也提供了更多的方式。 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年2月25日(星期四) 中午12:00 收件人: "user-zh"<[hidden email]>; 主题: Re: Flink 维表延迟join Hi, Benchao,这种发送到另外一个 topic ,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。 Benchao Li <[hidden email]> 于2020年8月27日周四 上午10:08写道: > Hi, > > 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。 > > 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。 > > 郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道: > > > 小伙伴们: > > > > > 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 > > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 > > > > Thanks&Regards > > > > -- > > Best, > Benchao Li > |
双流join,或者局部更新能解决
在 2021-06-07 16:35:10,"Jason Lee" <[hidden email]> 写道: > > >我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL 维表延迟Join的问题了吗? > > >有解决方案的小伙伴能分享下嘛? >| | >JasonLee >| >| >[hidden email] >| >签名由网易邮箱大师定制 > > >在2021年02月25日 14:40,Suhan<[hidden email]> 写道: >我们生产环境也遇到了同样的问题,除了benchao说的用算子来做延迟join外。可以使用rocketmq的延迟发送功能来存放维度拼接失败的消息。然后flink再同时消费kafka + rocket mq的数据。 >我建议在生产环境使用,因为有的时候flink侧很难判断到底哪种情况是拼接失败,是异常呢,还是空值呢,还是返回结果不符合要求。我觉得在用户层面做会比较灵活。单独的容灾队列对于下游的处理也提供了更多的方式。 > > > > > >------------------ 原始邮件 ------------------ >发件人: "user-zh" <[hidden email]>; >发送时间: 2021年2月25日(星期四) 中午12:00 >收件人: "user-zh"<[hidden email]>; > >主题: Re: Flink 维表延迟join > > > >Hi, > > Benchao,这种发送到另外一个 topic >,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。 > >Benchao Li <[hidden email]> 于2020年8月27日周四 上午10:08写道: > >> Hi, >> >> 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。 >> >> 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。 >> >> 郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道: >> >> > 小伙伴们: >> > >> > >> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 >> > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 >> > >> > Thanks&Regards >> >> >> >> -- >> >> Best, >> Benchao Li >> |
In reply to this post by Jason Lee
双流interval join是否可行呢?
在 2021-06-07 16:35:10,"Jason Lee" <[hidden email]> 写道: > > >我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL 维表延迟Join的问题了吗? > > >有解决方案的小伙伴能分享下嘛? >| | >JasonLee >| >| >[hidden email] >| >签名由网易邮箱大师定制 > > >在2021年02月25日 14:40,Suhan<[hidden email]> 写道: >我们生产环境也遇到了同样的问题,除了benchao说的用算子来做延迟join外。可以使用rocketmq的延迟发送功能来存放维度拼接失败的消息。然后flink再同时消费kafka + rocket mq的数据。 >我建议在生产环境使用,因为有的时候flink侧很难判断到底哪种情况是拼接失败,是异常呢,还是空值呢,还是返回结果不符合要求。我觉得在用户层面做会比较灵活。单独的容灾队列对于下游的处理也提供了更多的方式。 > > > > > >------------------ 原始邮件 ------------------ >发件人: "user-zh" <[hidden email]>; >发送时间: 2021年2月25日(星期四) 中午12:00 >收件人: "user-zh"<[hidden email]>; > >主题: Re: Flink 维表延迟join > > > >Hi, > > Benchao,这种发送到另外一个 topic >,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。 > >Benchao Li <[hidden email]> 于2020年8月27日周四 上午10:08写道: > >> Hi, >> >> 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。 >> >> 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。 >> >> 郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道: >> >> > 小伙伴们: >> > >> > >> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 >> > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 >> > >> > Thanks&Regards >> >> >> >> -- >> >> Best, >> Benchao Li >> |
Free forum by Nabble | Edit this page |