Flink sql join问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink sql join问题

huang
Hi all,


请问用Flink sql做双流join。如果希望两个流都只保存每个key的最新的数据,这样相当于每次join都只输出最新的一条记录。请问这种场景sql支持吗


thanks
Reply | Threaded
Open this post in threaded view
|

Re: Flink sql join问题

Zhenghua Gao
可以试下最新flink 1.9 blink
planner的firstRow/lastRow优化[1]能否满足你的需求,目前的限制是只能基于procTime来去重。

* e.g.
* 1. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*          ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps first row.
* 2. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*          ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps last row.


[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecDeduplicateRule.scala

*Best Regards,*
*Zhenghua Gao*


On Tue, Aug 6, 2019 at 2:28 PM huang <[hidden email]> wrote:

> Hi all,
>
>
> 请问用Flink
> sql做双流join。如果希望两个流都只保存每个key的最新的数据,这样相当于每次join都只输出最新的一条记录。请问这种场景sql支持吗
>
>
> thanks