Hi,all
我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 FLink sql有什么方案实现吗? 感谢您的回复 |
补充一下:明确的说是维表的join,A表关联B表(维表),想让A表延迟一会再关联B表
> 2020年7月3日 下午5:53,admin <[hidden email]> 写道: > > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 |
设置一个窗口时间,如果有需要取最新的,可以再做一下处理。
------------------ 原始邮件 ------------------ 发件人: admin <[hidden email]> 发送时间: 2020年7月3日 18:01 收件人: user-zh <[hidden email]> 主题: 回复:Flink sql 主动使数据延时一段时间有什么方案 补充一下:明确的说是维表的join,A表关联B表(维表),想让A表延迟一会再关联B表 > 2020年7月3日 下午5:53,admin <[hidden email]> 写道: > > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 |
窗口得用group by,字段会丢失 在2020年07月03日 19:11,kcz 写道: 设置一个窗口时间,如果有需要取最新的,可以再做一下处理。 ------------------ 原始邮件 ------------------ 发件人: admin <[hidden email]> 发送时间: 2020年7月3日 18:01 收件人: user-zh <[hidden email]> 主题: 回复:Flink sql 主动使数据延时一段时间有什么方案 补充一下:明确的说是维表的join,A表关联B表(维表),想让A表延迟一会再关联B表 > 2020年7月3日 下午5:53,admin <[hidden email]> 写道: > > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 |
In reply to this post by admin
我们也遇到过类似场景。
如果你的数据里面有事件时间,可以写个udf来判断下,如果事件时间-当前时间 小于某个阈值,可以sleep一下。 如果没有事件时间,那就不太好直接搞了,我们是自己搞了一个延迟维表,就是保证每条数据进到维表join算子后等固定时间后再去join。 admin <[hidden email]> 于2020年7月3日周五 下午5:54写道: > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 -- Best, Benchao Li |
In reply to this post by admin
还有一种很有意思的思路。
如果你不考虑数据是否会有乱序,而且保证维表中一定能join到结果,那就可以正常join,如果join不到,就把这条数据再发送到source的topic里,实现了一种类似于for循环的能力。。 admin <[hidden email]> 于2020年7月3日周五 下午5:54写道: > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 -- Best, Benchao Li |
In reply to this post by admin
奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。
admin <[hidden email]> 于2020年7月3日周五 下午5:54写道: > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 -- Best, Benchao Li |
Hi
刚刚本超说了四种方法, 方法1.使用udf,查不到 sleep 等一下在查 方法2.在 join operator处数据等一会再去查 方法3.如果没有 join 上,就把数据发到source,循环join。 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了 上述方法应该都能实现相同的效果。 我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。 Best forideal 在 2020-07-03 23:05:06,"Benchao Li" <[hidden email]> 写道: >奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。 > >admin <[hidden email]> 于2020年7月3日周五 下午5:54写道: > >> Hi,all >> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 >> FLink sql有什么方案实现吗? >> >> 感谢您的回复 > > > >-- > >Best, >Benchao Li |
感谢benchao和forideal的方案,
方法1.使用udf,查不到 sleep 等一下在查 --这个可以尝试 方法2.在 join operator处数据等一会再去查 —我们使用的是flink sql,不是streaming,所以该方案可能行不通 方法3.如果没有 join 上,就把数据发到source,循环join。 --我们这个维表join的场景类似filter的功能,如果关联上则主流数据就不处理了,所以不一定非要join上,只是想延迟一会提升准确率 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了 —我们的source是kafka,好像不支持kafka的功能 方法5.扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。 --这个方案需要修改源码,也可以试一下 Best Sun.Zhu | | Sun.Zhu | | [hidden email] | 签名由网易邮箱大师定制 在2020年07月3日 23:26,forideal<[hidden email]> 写道: Hi 刚刚本超说了四种方法, 方法1.使用udf,查不到 sleep 等一下在查 方法2.在 join operator处数据等一会再去查 方法3.如果没有 join 上,就把数据发到source,循环join。 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了 上述方法应该都能实现相同的效果。 我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。 Best forideal 在 2020-07-03 23:05:06,"Benchao Li" <[hidden email]> 写道: 奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。 admin <[hidden email]> 于2020年7月3日周五 下午5:54写道: Hi,all 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 FLink sql有什么方案实现吗? 感谢您的回复 -- Best, Benchao Li |
Hi Sun ZHu,
关于方法4,我记得kafka有时间轮功能,可以做到延迟消息的,可以了解一下。 Best, shizk233 Sun.Zhu <[hidden email]> 于2020年7月4日周六 上午12:23写道: > 感谢benchao和forideal的方案, > 方法1.使用udf,查不到 sleep 等一下在查 > --这个可以尝试 > 方法2.在 join operator处数据等一会再去查 > —我们使用的是flink sql,不是streaming,所以该方案可能行不通 > 方法3.如果没有 join 上,就把数据发到source,循环join。 > --我们这个维表join的场景类似filter的功能,如果关联上则主流数据就不处理了,所以不一定非要join上,只是想延迟一会提升准确率 > 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了 > —我们的source是kafka,好像不支持kafka的功能 > 方法5.扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait > 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。 > --这个方案需要修改源码,也可以试一下 > > > Best > Sun.Zhu > | | > Sun.Zhu > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > > 在2020年07月3日 23:26,forideal<[hidden email]> 写道: > Hi > > > > > 刚刚本超说了四种方法, > > 方法1.使用udf,查不到 sleep 等一下在查 > > 方法2.在 join operator处数据等一会再去查 > > 方法3.如果没有 join 上,就把数据发到source,循环join。 > > 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了 > > > > > 上述方法应该都能实现相同的效果。 > > > > > 我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait > 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。 > > > > > Best forideal > > > > > > > > > > > > > > > > > > 在 2020-07-03 23:05:06,"Benchao Li" <[hidden email]> 写道: > 奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。 > > admin <[hidden email]> 于2020年7月3日周五 下午5:54写道: > > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 > > > > -- > > Best, > Benchao Li > |
Free forum by Nabble | Edit this page |