Flink 维表延迟join

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 维表延迟join

郑斌斌
小伙伴们:
      大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。

Thanks&Regards
Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

Benchao Li-2
Hi,

我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。

其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。

郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道:

> 小伙伴们:
>
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
>
> Thanks&Regards



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

china_tao
In reply to this post by 郑斌斌
一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
个人推荐先用null存储,后期etl补录。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

Leonard Xu

Hi, all

看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了,
感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗?

Best
Leonard

> 在 2020年8月27日,10:39,china_tao <[hidden email]> 写道:
>
> 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
> join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
> 个人推荐先用null存储,后期etl补录。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

Jark
Administrator
@Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。

On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]> wrote:

>
> Hi, all
>
> 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了,
> 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗?
>
> Best
> Leonard
>
> > 在 2020年8月27日,10:39,china_tao <[hidden email]> 写道:
> >
> > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
> >
> join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
> > 个人推荐先用null存储,后期etl补录。
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

Leonard Xu
多谢 Jark 提议

Issue[1] 建好了, 大家可以在issue下讨论。

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-19063 <https://issues.apache.org/jira/browse/FLINK-19063>


> 在 2020年8月27日,19:54,Jark Wu <[hidden email]> 写道:
>
> @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。
>
> On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email] <mailto:[hidden email]>> wrote:
>
> Hi, all
>
> 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了,
> 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗?
>
> Best
> Leonard
>
> > 在 2020年8月27日,10:39,china_tao <[hidden email] <mailto:[hidden email]>> 写道:
> >
> > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
> > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
> > 个人推荐先用null存储,后期etl补录。
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/ <http://apache-flink.147419.n8.nabble.com/>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

魏烽
In reply to this post by 郑斌斌
各位好:

现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案:

1.直接将维表一次性加载到内存进行join;

2.使用mysql或者hbase外部存储每条数据进行查询join;

但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度

请问各位有什么更好的建议嘛?感谢

 原始邮件
发件人: Leonard Xu<[hidden email]>
收件人: Jark Wu<[hidden email]>
抄送: user-zh<[hidden email]>; Benchao Li<[hidden email]>
发送时间: 2020年8月27日(周四) 20:11
主题: Re: Flink 维表延迟join


多谢 Jark 提议

Issue[1] 建好了, 大家可以在issue下讨论。

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-19063 <https://issues.apache.org/jira/browse/FLINK-19063>


> 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]>> 写道:
>
> @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。
>
> On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto:[hidden email]> <mailto:[hidden email]<mailto:[hidden email]>>> wrote:
>
> Hi, all
>
> 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了,
> 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗?
>
> Best
> Leonard
>
> > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto:[hidden email]> <mailto:[hidden email]<mailto:[hidden email]>>> 写道:
> >
> > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
> > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
> > 个人推荐先用null存储,后期etl补录。
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/ <http://apache-flink.147419.n8.nabble.com/>
>



Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

Benchao Li-2
这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可,
这样可以用到批里面一些特有的join优化。

魏烽 <[hidden email]> 于2020年8月28日周五 上午9:58写道:

> 各位好:
>
>
> 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案:
>
> 1.直接将维表一次性加载到内存进行join;
>
> 2.使用mysql或者hbase外部存储每条数据进行查询join;
>
> 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度
>
> 请问各位有什么更好的建议嘛?感谢
>
>  原始邮件
> 发件人: Leonard Xu<[hidden email]>
> 收件人: Jark Wu<[hidden email]>
> 抄送: user-zh<[hidden email]>; Benchao Li<[hidden email]>
> 发送时间: 2020年8月27日(周四) 20:11
> 主题: Re: Flink 维表延迟join
>
>
> 多谢 Jark 提议
>
> Issue[1] 建好了, 大家可以在issue下讨论。
>
> 祝好
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-19063 <
> https://issues.apache.org/jira/browse/FLINK-19063>
>
>
> > 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]>>
> 写道:
> >
> > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。
> >
> > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto:
> [hidden email]> <mailto:[hidden email]<mailto:[hidden email]>>>
> wrote:
> >
> > Hi, all
> >
> > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了,
> > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗?
> >
> > Best
> > Leonard
> >
> > > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto:
> [hidden email]> <mailto:[hidden email]<mailto:
> [hidden email]>>> 写道:
> > >
> > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
> > >
> join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
> > > 个人推荐先用null存储,后期etl补录。
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/ <
> http://apache-flink.147419.n8.nabble.com/>
> >
>
>
>
>

--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

Jark
Administrator
我们有一个 issue 就是去支持维表 join 文件系统的(包括静态的,和缓慢变化的)。
https://issues.apache.org/jira/browse/FLINK-17397
一种思路是复用现有的 filesystem scan 的能力,temporal join 算子将 filesystem 的数据放在 state
里,然后主流数据去 lookup state。

Best,
Jark

On Fri, 28 Aug 2020 at 10:28, Benchao Li <[hidden email]> wrote:

> 这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可,
> 这样可以用到批里面一些特有的join优化。
>
> 魏烽 <[hidden email]> 于2020年8月28日周五 上午9:58写道:
>
>> 各位好:
>>
>>
>> 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案:
>>
>> 1.直接将维表一次性加载到内存进行join;
>>
>> 2.使用mysql或者hbase外部存储每条数据进行查询join;
>>
>> 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度
>>
>> 请问各位有什么更好的建议嘛?感谢
>>
>>  原始邮件
>> 发件人: Leonard Xu<[hidden email]>
>> 收件人: Jark Wu<[hidden email]>
>> 抄送: user-zh<[hidden email]>; Benchao Li<[hidden email]>
>> 发送时间: 2020年8月27日(周四) 20:11
>> 主题: Re: Flink 维表延迟join
>>
>>
>> 多谢 Jark 提议
>>
>> Issue[1] 建好了, 大家可以在issue下讨论。
>>
>> 祝好
>> Leonard
>> [1] https://issues.apache.org/jira/browse/FLINK-19063 <
>> https://issues.apache.org/jira/browse/FLINK-19063>
>>
>>
>> > 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]>>
>> 写道:
>> >
>> > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。
>> >
>> > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto:
>> [hidden email]> <mailto:[hidden email]<mailto:[hidden email]>>>
>> wrote:
>> >
>> > Hi, all
>> >
>> > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了,
>> > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗?
>> >
>> > Best
>> > Leonard
>> >
>> > > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto:
>> [hidden email]> <mailto:[hidden email]<mailto:
>> [hidden email]>>> 写道:
>> > >
>> > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
>> > >
>> join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
>> > > 个人推荐先用null存储,后期etl补录。
>> > >
>> > >
>> > >
>> > > --
>> > > Sent from: http://apache-flink.147419.n8.nabble.com/ <
>> http://apache-flink.147419.n8.nabble.com/>
>> >
>>
>>
>>
>>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

魏烽
In reply to this post by 郑斌斌
您好,感谢回复:

意思是原始数据用批的方式,然后维表也用批的方式读取进行,进行两个流的join嘛

但是为了程序的扩展性,原始数据只能按照流的方式来读,因为会有其它的需要流的场景

 原始邮件
发件人: Benchao Li<[hidden email]>
收件人: user-zh<[hidden email]>
抄送: Jark Wu<[hidden email]>
发送时间: 2020年8月28日(周五) 10:28
主题: Re: Flink 维表延迟join


这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可,
这样可以用到批里面一些特有的join优化。

魏烽 <[hidden email]<mailto:[hidden email]>> 于2020年8月28日周五 上午9:58写道:

> 各位好:
>
>
> 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案:
>
> 1.直接将维表一次性加载到内存进行join;
>
> 2.使用mysql或者hbase外部存储每条数据进行查询join;
>
> 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度
>
> 请问各位有什么更好的建议嘛?感谢
>
>  原始邮件
> 发件人: Leonard Xu<[hidden email]<mailto:[hidden email]>>
> 收件人: Jark Wu<[hidden email]<mailto:[hidden email]>>
> 抄送: user-zh<[hidden email]<mailto:[hidden email]>>; Benchao Li<[hidden email]<mailto:[hidden email]>>
> 发送时间: 2020年8月27日(周四) 20:11
> 主题: Re: Flink 维表延迟join
>
>
> 多谢 Jark 提议
>
> Issue[1] 建好了, 大家可以在issue下讨论。
>
> 祝好
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-19063 <
> https://issues.apache.org/jira/browse/FLINK-19063>
>
>
> > 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]><mailto:[hidden email]<mailto:[hidden email]>>>
> 写道:
> >
> > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。
> >
> > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto:[hidden email]><mailto:
> [hidden email]<mailto:[hidden email]>> <mailto:[hidden email]<mailto:[hidden email]><mailto:[hidden email]<mailto:[hidden email]>>>>
> wrote:
> >
> > Hi, all
> >
> > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了,
> > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗?
> >
> > Best
> > Leonard
> >
> > > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto:[hidden email]><mailto:
> [hidden email]<mailto:[hidden email]>> <mailto:[hidden email]<mailto:[hidden email]><mailto:
> [hidden email]<mailto:[hidden email]>>>> 写道:
> > >
> > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
> > >
> join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
> > > 个人推荐先用null存储,后期etl补录。
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/ <
> http://apache-flink.147419.n8.nabble.com/>
> >
>
>
>
>

--

Best,
Benchao Li

Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink 维表延迟join

gongpulin@163.com
采用自定义Source读取维表,做成流,双流join,在KeyedCoProcessFunction中将维表维护在state里面



[hidden email]
 
发件人: 魏烽
发送时间: 2020-08-28 10:49
收件人: user-zh
抄送: Jark Wu
主题: Re: Flink 维表延迟join
您好,感谢回复:
 
意思是原始数据用批的方式,然后维表也用批的方式读取进行,进行两个流的join嘛
 
但是为了程序的扩展性,原始数据只能按照流的方式来读,因为会有其它的需要流的场景
 
原始邮件
发件人: Benchao Li<[hidden email]>
收件人: user-zh<[hidden email]>
抄送: Jark Wu<[hidden email]>
发送时间: 2020年8月28日(周五) 10:28
主题: Re: Flink 维表延迟join
 
 
这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可,
这样可以用到批里面一些特有的join优化。
 
魏烽 <[hidden email]<mailto:[hidden email]>> 于2020年8月28日周五 上午9:58写道:
 

> 各位好:
>
>
> 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案:
>
> 1.直接将维表一次性加载到内存进行join;
>
> 2.使用mysql或者hbase外部存储每条数据进行查询join;
>
> 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度
>
> 请问各位有什么更好的建议嘛?感谢
>
>  原始邮件
> 发件人: Leonard Xu<[hidden email]<mailto:[hidden email]>>
> 收件人: Jark Wu<[hidden email]<mailto:[hidden email]>>
> 抄送: user-zh<[hidden email]<mailto:[hidden email]>>; Benchao Li<[hidden email]<mailto:[hidden email]>>
> 发送时间: 2020年8月27日(周四) 20:11
> 主题: Re: Flink 维表延迟join
>
>
> 多谢 Jark 提议
>
> Issue[1] 建好了, 大家可以在issue下讨论。
>
> 祝好
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-19063 <
> https://issues.apache.org/jira/browse/FLINK-19063>
>
>
> > 在 2020年8月27日,19:54,Jark Wu <[hidden email]<mailto:[hidden email]><mailto:[hidden email]<mailto:[hidden email]>>>
> 写道:
> >
> > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。
> >
> > On Thu, 27 Aug 2020 at 11:12, Leonard Xu <[hidden email]<mailto:[hidden email]><mailto:
> [hidden email]<mailto:[hidden email]>> <mailto:[hidden email]<mailto:[hidden email]><mailto:[hidden email]<mailto:[hidden email]>>>>
> wrote:
> >
> > Hi, all
> >
> > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了,
> > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗?
> >
> > Best
> > Leonard
> >
> > > 在 2020年8月27日,10:39,china_tao <[hidden email]<mailto:[hidden email]><mailto:
> [hidden email]<mailto:[hidden email]>> <mailto:[hidden email]<mailto:[hidden email]><mailto:
> [hidden email]<mailto:[hidden email]>>>> 写道:
> > >
> > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
> > >
> join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
> > > 个人推荐先用null存储,后期etl补录。
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/ <
> http://apache-flink.147419.n8.nabble.com/>
> >
>
>
>
>
 
--
 
Best,
Benchao Li
 
Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

LakeShen
In reply to this post by Benchao Li-2
Hi,

    Benchao,这种发送到另外一个 topic
,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。

Benchao Li <[hidden email]> 于2020年8月27日周四 上午10:08写道:

> Hi,
>
> 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。
>
> 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。
>
> 郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道:
>
> > 小伙伴们:
> >
> >
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
> >
> > Thanks&Regards
>
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

LakeShen
In reply to this post by 郑斌斌
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们这边业务方也有维表延迟关联的述求,比如 HBase 维表的数据关联。
当前的场景是,有一张实时维表,消费 mysql binlog,然后业务方 etl 后,输出到
HBase。然后业务方还有另外一个流,会去关联这张维表,由于存在某些 rowkey
的数据还没有写入到 hbase,而另外一条流就去关联 HBase,却没有数据。所以业务方希望有个延迟维表关联功能,比如 10
分钟后在进行关联,目前我们考虑借助 Timer 来实现的,
社区如果有这个功能的话,我觉得对于 Flink 使用方会有很大帮助的。我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。

[1] https://issues.apache.org/jira/browse/FLINK-19063

Best,
LakeShen


郑斌斌 <[hidden email]> 于2020年8月27日周四 上午9:23写道:

> 小伙伴们:
>
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
>
> Thanks&Regards
Reply | Threaded
Open this post in threaded view
|

回复: Flink 维表延迟join

Suhan
In reply to this post by LakeShen
我们生产环境也遇到了同样的问题,除了benchao说的用算子来做延迟join外。可以使用rocketmq的延迟发送功能来存放维度拼接失败的消息。然后flink再同时消费kafka + rocket mq的数据。
我建议在生产环境使用,因为有的时候flink侧很难判断到底哪种情况是拼接失败,是异常呢,还是空值呢,还是返回结果不符合要求。我觉得在用户层面做会比较灵活。单独的容灾队列对于下游的处理也提供了更多的方式。
&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年2月25日(星期四) 中午12:00
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Flink 维表延迟join



Hi,

&nbsp;&nbsp;&nbsp; Benchao,这种发送到另外一个 topic
,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。

Benchao Li <[hidden email]&gt; 于2020年8月27日周四 上午10:08写道:

&gt; Hi,
&gt;
&gt; 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。
&gt;
&gt; 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。
&gt;
&gt; 郑斌斌 <[hidden email]&gt; 于2020年8月27日周四 上午9:23写道:
&gt;
&gt; &gt; 小伙伴们:
&gt; &gt;
&gt; &gt;
&gt; 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
&gt; &gt; 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
&gt; &gt;
&gt; &gt; Thanks&amp;Regards
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li
&gt;
Reply | Threaded
Open this post in threaded view
|

Re: Flink 维表延迟join

Smile
In reply to this post by 郑斌斌
我们也有遇到维度关联的时候维表比流晚到的情况,不过我们的流一般都有唯一键,因此目前是用 session window 来自定义控制流的落后时间来关联。

具体思路:
对于流,按唯一键 Group By,对 process time 开延迟时间长度的 session window。
因为 Group By 了唯一键,每个 session 窗口里面一定只有一条数据,所以一定是到了设定的 size 就会触发的。

比如期望流数据到达后一分钟再去关联维表,SQL 如下:

-- 原始 SQL
SELECT
  s.id,
  s.another_field,
  r.dim_id_value
FROM
  source_table AS s
JOIN dim_table
 for system_time AS of s.proctime AS r
ON r.id = s.id

-- 修改后的延迟1分钟进行关联的 SQL
SELECT
  s.id,
  s.another_field,
  r.dim_id_value
FROM (
  SELECT
  id,
  MAX(another_field) AS another_field,
  SESSION_PROCTIME(proctime, INTERVAL '1' MINUTE)AS proctime
  FROM source_table
  GROUP BY SESSION(proctime, INTERVAL '1' MINUTE), id
) AS s
JOIN dim_table
 for system_time AS of s.proctime AS r
ON r.id = s.id




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

回复: Flink 维表延迟join

Jason Lee
In reply to this post by Suhan


我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL 维表延迟Join的问题了吗?


有解决方案的小伙伴能分享下嘛?
| |
JasonLee
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2021年02月25日 14:40,Suhan<[hidden email]> 写道:
我们生产环境也遇到了同样的问题,除了benchao说的用算子来做延迟join外。可以使用rocketmq的延迟发送功能来存放维度拼接失败的消息。然后flink再同时消费kafka + rocket mq的数据。
我建议在生产环境使用,因为有的时候flink侧很难判断到底哪种情况是拼接失败,是异常呢,还是空值呢,还是返回结果不符合要求。我觉得在用户层面做会比较灵活。单独的容灾队列对于下游的处理也提供了更多的方式。
&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年2月25日(星期四) 中午12:00
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Flink 维表延迟join



Hi,

&nbsp;&nbsp;&nbsp; Benchao,这种发送到另外一个 topic
,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。

Benchao Li <[hidden email]&gt; 于2020年8月27日周四 上午10:08写道:

&gt; Hi,
&gt;
&gt; 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。
&gt;
&gt; 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。
&gt;
&gt; 郑斌斌 <[hidden email]&gt; 于2020年8月27日周四 上午9:23写道:
&gt;
&gt; &gt; 小伙伴们:
&gt; &gt;
&gt; &gt;
&gt; 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
&gt; &gt; 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
&gt; &gt;
&gt; &gt; Thanks&amp;Regards
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li
&gt;
Reply | Threaded
Open this post in threaded view
|

Re:回复: Flink 维表延迟join

Michael Ran
双流join,或者局部更新能解决
在 2021-06-07 16:35:10,"Jason Lee" <[hidden email]> 写道:

>
>
>我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL 维表延迟Join的问题了吗?
>
>
>有解决方案的小伙伴能分享下嘛?
>| |
>JasonLee
>|
>|
>[hidden email]
>|
>签名由网易邮箱大师定制
>
>
>在2021年02月25日 14:40,Suhan<[hidden email]> 写道:
>我们生产环境也遇到了同样的问题,除了benchao说的用算子来做延迟join外。可以使用rocketmq的延迟发送功能来存放维度拼接失败的消息。然后flink再同时消费kafka + rocket mq的数据。
>我建议在生产环境使用,因为有的时候flink侧很难判断到底哪种情况是拼接失败,是异常呢,还是空值呢,还是返回结果不符合要求。我觉得在用户层面做会比较灵活。单独的容灾队列对于下游的处理也提供了更多的方式。
>&nbsp;
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
>发送时间:&nbsp;2021年2月25日(星期四) 中午12:00
>收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
>主题:&nbsp;Re: Flink 维表延迟join
>
>
>
>Hi,
>
>&nbsp;&nbsp;&nbsp; Benchao,这种发送到另外一个 topic
>,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。
>
>Benchao Li <[hidden email]&gt; 于2020年8月27日周四 上午10:08写道:
>
>&gt; Hi,
>&gt;
>&gt; 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。
>&gt;
>&gt; 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。
>&gt;
>&gt; 郑斌斌 <[hidden email]&gt; 于2020年8月27日周四 上午9:23写道:
>&gt;
>&gt; &gt; 小伙伴们:
>&gt; &gt;
>&gt; &gt;
>&gt; 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
>&gt; &gt; 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
>&gt; &gt;
>&gt; &gt; Thanks&amp;Regards
>&gt;
>&gt;
>&gt;
>&gt; --
>&gt;
>&gt; Best,
>&gt; Benchao Li
>&gt;
Reply | Threaded
Open this post in threaded view
|

Re:回复: Flink 维表延迟join

casel.chen
In reply to this post by Jason Lee
双流interval join是否可行呢?

















在 2021-06-07 16:35:10,"Jason Lee" <[hidden email]> 写道:

>
>
>我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL 维表延迟Join的问题了吗?
>
>
>有解决方案的小伙伴能分享下嘛?
>| |
>JasonLee
>|
>|
>[hidden email]
>|
>签名由网易邮箱大师定制
>
>
>在2021年02月25日 14:40,Suhan<[hidden email]> 写道:
>我们生产环境也遇到了同样的问题,除了benchao说的用算子来做延迟join外。可以使用rocketmq的延迟发送功能来存放维度拼接失败的消息。然后flink再同时消费kafka + rocket mq的数据。
>我建议在生产环境使用,因为有的时候flink侧很难判断到底哪种情况是拼接失败,是异常呢,还是空值呢,还是返回结果不符合要求。我觉得在用户层面做会比较灵活。单独的容灾队列对于下游的处理也提供了更多的方式。
>&nbsp;
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
>发送时间:&nbsp;2021年2月25日(星期四) 中午12:00
>收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
>主题:&nbsp;Re: Flink 维表延迟join
>
>
>
>Hi,
>
>&nbsp;&nbsp;&nbsp; Benchao,这种发送到另外一个 topic
>,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。
>
>Benchao Li <[hidden email]&gt; 于2020年8月27日周四 上午10:08写道:
>
>&gt; Hi,
>&gt;
>&gt; 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。
>&gt;
>&gt; 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。
>&gt;
>&gt; 郑斌斌 <[hidden email]&gt; 于2020年8月27日周四 上午9:23写道:
>&gt;
>&gt; &gt; 小伙伴们:
>&gt; &gt;
>&gt; &gt;
>&gt; 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
>&gt; &gt; 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
>&gt; &gt;
>&gt; &gt; Thanks&amp;Regards
>&gt;
>&gt;
>&gt;
>&gt; --
>&gt;
>&gt; Best,
>&gt; Benchao Li
>&gt;