關於如何在流數據上計算 Top K 的應用問題

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

關於如何在流數據上計算 Top K 的應用問題

Tony Wei
Hi,

最近正在研究 Top K 的問題,在研究中找到了 Blink SQL 可以透過維護一個儲存 K 的最大紀錄的
”堆”來優化底下這類 SQL,不過我認為這只能針對 `score` 只會增加不減少的情況。

> SELECT user_id, score
> FROM (
>   SELECT *,
>     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
>   FROM user_scores)
> WHERE row_num <= 3
>
>
我的問題是當如果這樣的計算是應用在流數據上,且 `score` 可能隨時間增加或是“減少”的話,例
如底下這類的 SQL,能有什麼樣的優化?

> SELECT user_id, score
> FROM (
>   SELECT *,
>     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
>   FROM (
>   SELECT user_id, LAST_VAL(score) AS score
>   FROM user_scores
>   GROUP BY user_id))
> WHERE row_num <= 3
>
> SQL 中的 `user_scores` 可以當作是從 DataStream 直接轉換過來的 Dynamic Table,
`LAST_VAL`假設是一種 UDAF,可以挑出目前最新的值。所以,可以想像這張 table 的 user's
`score` 是會隨時間變化增減。

上面所說堆的優化無法處理這樣的問題,底下舉個例子。假設今天有一個 top-3 的堆中已經存放
了三個使用者:A, B, C,各自的 scores 是:4, 3, 2,接下來收到了一個使用者 D 和他的分數是
1 的話,這個時候演算法會直接忽略掉 D,因為他不在 top-3 的範圍內。但是當下一個如果收到
的是一個更新 A 使用者的 score 為 0 的紀錄的話,這個時候理論上我們知道 top-3 會改為 B, C,
D,但是在維護 top-3 的堆中我們無力找回被忽略的使用者 D。這樣的優化在 batch mode 是沒有
問題的,因為最新的 score 在有限的數據中會是固定的不動的。

不過當處理流數據,我目前只想到這種應用最終可能需要退回成存放全部使用者 scores 才有辦
法處理,才能隨時計算出正確的 top-k。所以我想請教各位大牛有沒有什麼樣的優化方式可以處
理這樣的問題,讓狀態不需要存到全部資料?當然這個問題不侷限在 SQL,如果有任何實作在
DataStream 上的優化都是可接受。感謝大家幫忙。

Best Regards,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: 關於如何在流數據上計算 Top K 的應用問題

Caizhi Weng
Hi Tony!

其实 Flink 对 Top-N 问题并没有很 fancy 的实现... Flink 把 Top-N 问题分成三种情况:

1. 数据只添加,不更新不删除(就像 batch mode)
这种情况的实现是 AppendOnlyTopNFunction,就像你说的一样使用一个 Map
来维护。不能直接使用堆来维护的原因是:因为要告知下游每一条记录的精确排名。

2. 数据可能有添加和更新
这种情况的实现是 UpdatableTopNFunction,但是这个类开头的注释表明了它只能用于以下特殊情况:
    * 数据更新后排名只能变小不能变大;
    * 数据的 sort key 要 unique;
    * 不能删数据或者撤回数据。
这种情况就避免了你上面说的排名变大,导致掉出 Top-N 的情况。还是可以用一个 Map 来维护。

3. 数据可以添加、更新和删除
这种情况的实现是 RetractableTopNFunction。因为数据更新 / 删除后可能会掉出 Top-N,要找新数据补进来,那么只能从
state 里捞应该补进来的数据。当前由于社区没有 SortedMapState 的实现,现在是用 ValueState<SortedMap<>> 存
state。每次读 state 都是把整个 state 拿出来读的,所以数据量大了其实没办法用... 等社区引入了 SortedMapState
以后,就可以用 iterator 只读取前面一些我们想要补进来的数据。

Tony Wei <[hidden email]> 于2019年7月11日周四 上午9:49写道:

> Hi,
>
> 最近正在研究 Top K 的問題,在研究中找到了 Blink SQL 可以透過維護一個儲存 K 的最大紀錄的
> ”堆”來優化底下這類 SQL,不過我認為這只能針對 `score` 只會增加不減少的情況。
>
> > SELECT user_id, score
> > FROM (
> >   SELECT *,
> >     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> >   FROM user_scores)
> > WHERE row_num <= 3
> >
> >
> 我的問題是當如果這樣的計算是應用在流數據上,且 `score` 可能隨時間增加或是“減少”的話,例
> 如底下這類的 SQL,能有什麼樣的優化?
>
> > SELECT user_id, score
> > FROM (
> >   SELECT *,
> >     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> >   FROM (
> >       SELECT user_id, LAST_VAL(score) AS score
> >       FROM user_scores
> >       GROUP BY user_id))
> > WHERE row_num <= 3
> >
> > SQL 中的 `user_scores` 可以當作是從 DataStream 直接轉換過來的 Dynamic Table,
> `LAST_VAL`假設是一種 UDAF,可以挑出目前最新的值。所以,可以想像這張 table 的 user's
> `score` 是會隨時間變化增減。
>
> 上面所說堆的優化無法處理這樣的問題,底下舉個例子。假設今天有一個 top-3 的堆中已經存放
> 了三個使用者:A, B, C,各自的 scores 是:4, 3, 2,接下來收到了一個使用者 D 和他的分數是
> 1 的話,這個時候演算法會直接忽略掉 D,因為他不在 top-3 的範圍內。但是當下一個如果收到
> 的是一個更新 A 使用者的 score 為 0 的紀錄的話,這個時候理論上我們知道 top-3 會改為 B, C,
> D,但是在維護 top-3 的堆中我們無力找回被忽略的使用者 D。這樣的優化在 batch mode 是沒有
> 問題的,因為最新的 score 在有限的數據中會是固定的不動的。
>
> 不過當處理流數據,我目前只想到這種應用最終可能需要退回成存放全部使用者 scores 才有辦
> 法處理,才能隨時計算出正確的 top-k。所以我想請教各位大牛有沒有什麼樣的優化方式可以處
> 理這樣的問題,讓狀態不需要存到全部資料?當然這個問題不侷限在 SQL,如果有任何實作在
> DataStream 上的優化都是可接受。感謝大家幫忙。
>
> Best Regards,
> Tony Wei
>
Reply | Threaded
Open this post in threaded view
|

Re: 關於如何在流數據上計算 Top K 的應用問題

Tony Wei
Hi Caizhi,

謝謝你的回答。你的第三點想法給了我蠻大的啟發,我本來設想的情況是能否避免把全部使用者
資料都存放在 state 來解決這個問題,但聽起來這部分是避免不了的。如果我沒有理解錯,你的
作法比較像是將全部使用者的排名資訊都存放在 state,在使用了 rocksdb state backend 的狀況
下,這些資料都不需要在記憶體中實體化,所以不會受限記憶體的擴展性,只有那些需要被放入
Top-N 的資料會被讀取出來存放在一個 in-memory 的堆中做為加速運算的優化。

在我們目前的應用場景中,精確排名不是必要的資訊,可能還有一些不是硬性的需求來鬆綁這個
問題的限制,雖然沒有很有把握但或許可以根據你的想法實現一個專門針對我們應用情境的優化。

撇開上述特殊的情況,我另外好奇的是第一點中維護的 map state 要記錄精確的排名這件事的細
節,想知道如果更新是循序變化的,如果添加了一筆新的紀錄,可能會導致多個紀錄的排名需要
加一或是減一,這部分是不是也需要遍歷整個 map 去判斷是否有增減,針對變動的部分通知下
游?

Best Regards,
Tony Wei

Caizhi Weng <[hidden email]> 於 2019年7月12日 週五 上午11:36寫道:

> Hi Tony!
>
> 其实 Flink 对 Top-N 问题并没有很 fancy 的实现... Flink 把 Top-N 问题分成三种情况:
>
> 1. 数据只添加,不更新不删除(就像 batch mode)
> 这种情况的实现是 AppendOnlyTopNFunction,就像你说的一样使用一个 Map
> 来维护。不能直接使用堆来维护的原因是:因为要告知下游每一条记录的精确排名。
>
> 2. 数据可能有添加和更新
> 这种情况的实现是 UpdatableTopNFunction,但是这个类开头的注释表明了它只能用于以下特殊情况:
>     * 数据更新后排名只能变小不能变大;
>     * 数据的 sort key 要 unique;
>     * 不能删数据或者撤回数据。
> 这种情况就避免了你上面说的排名变大,导致掉出 Top-N 的情况。还是可以用一个 Map 来维护。
>
> 3. 数据可以添加、更新和删除
> 这种情况的实现是 RetractableTopNFunction。因为数据更新 / 删除后可能会掉出 Top-N,要找新数据补进来,那么只能从
> state 里捞应该补进来的数据。当前由于社区没有 SortedMapState 的实现,现在是用 ValueState<SortedMap<>> 存
> state。每次读 state 都是把整个 state 拿出来读的,所以数据量大了其实没办法用... 等社区引入了 SortedMapState
> 以后,就可以用 iterator 只读取前面一些我们想要补进来的数据。
>
> Tony Wei <[hidden email]> 于2019年7月11日周四 上午9:49写道:
>
> > Hi,
> >
> > 最近正在研究 Top K 的問題,在研究中找到了 Blink SQL 可以透過維護一個儲存 K 的最大紀錄的
> > ”堆”來優化底下這類 SQL,不過我認為這只能針對 `score` 只會增加不減少的情況。
> >
> > > SELECT user_id, score
> > > FROM (
> > >   SELECT *,
> > >     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> > >   FROM user_scores)
> > > WHERE row_num <= 3
> > >
> > >
> > 我的問題是當如果這樣的計算是應用在流數據上,且 `score` 可能隨時間增加或是“減少”的話,例
> > 如底下這類的 SQL,能有什麼樣的優化?
> >
> > > SELECT user_id, score
> > > FROM (
> > >   SELECT *,
> > >     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> > >   FROM (
> > >       SELECT user_id, LAST_VAL(score) AS score
> > >       FROM user_scores
> > >       GROUP BY user_id))
> > > WHERE row_num <= 3
> > >
> > > SQL 中的 `user_scores` 可以當作是從 DataStream 直接轉換過來的 Dynamic Table,
> > `LAST_VAL`假設是一種 UDAF,可以挑出目前最新的值。所以,可以想像這張 table 的 user's
> > `score` 是會隨時間變化增減。
> >
> > 上面所說堆的優化無法處理這樣的問題,底下舉個例子。假設今天有一個 top-3 的堆中已經存放
> > 了三個使用者:A, B, C,各自的 scores 是:4, 3, 2,接下來收到了一個使用者 D 和他的分數是
> > 1 的話,這個時候演算法會直接忽略掉 D,因為他不在 top-3 的範圍內。但是當下一個如果收到
> > 的是一個更新 A 使用者的 score 為 0 的紀錄的話,這個時候理論上我們知道 top-3 會改為 B, C,
> > D,但是在維護 top-3 的堆中我們無力找回被忽略的使用者 D。這樣的優化在 batch mode 是沒有
> > 問題的,因為最新的 score 在有限的數據中會是固定的不動的。
> >
> > 不過當處理流數據,我目前只想到這種應用最終可能需要退回成存放全部使用者 scores 才有辦
> > 法處理,才能隨時計算出正確的 top-k。所以我想請教各位大牛有沒有什麼樣的優化方式可以處
> > 理這樣的問題,讓狀態不需要存到全部資料?當然這個問題不侷限在 SQL,如果有任何實作在
> > DataStream 上的優化都是可接受。感謝大家幫忙。
> >
> > Best Regards,
> > Tony Wei
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 關於如何在流數據上計算 Top K 的應用問題

Caizhi Weng
Hi Tony!

這些資料都不需要在記憶體中實體化,所以不會受限記憶體的擴展性,只有那些需要被放入 Top-N 的資料會被讀取出來存放在一個 in-memory
> 的堆中做為加速運算的優化。


前两种情况下,由于不需要从老数据中捞记录回 Top-N,state 里其实也只要放 Top-N 的数据。Top-N
的数据先在内存里维护,checkpoint 的时候同步到 state。

第三种情况下,你的说法是社区引入 SortedMapState 后可以实现的情况,现在由于暂时没有引入 SortedMapState,每次读
state 还是会把所有数据都读出来(具体来说有两个 state,一个 ValueState 有序地存储了每个 sort key 的记录数量,另一个
MapState 无序地根据 sort key 取出具体的记录列表),所以其实当前实现会受内存限制... 还是要等引入 SortedMapState
就可以不受内存限制了,但是每次从 state 里取数据仍然很耗时。

如果添加了一筆新的紀錄,可能會導致多個紀錄的排名需要加一或是減一,這部分是不是也需要遍歷整個 map 去判斷是否有增減,針對變動的部分通知下游?


是的,Flink 现在会把每一条有排名变化的数据通知下游(具体实现可参考 AppendOnlyTopNFunction 的
processElementWithRowNumber 方法)。这个主要是为了 row_number()
等需要精确知道排名的情况准备的。如果只是单纯的 limit 3 这样的情况,只会通知下游进入和离开 Top-N
的那两条数据(processElementWithoutRowNumber 方法)。

Tony Wei <[hidden email]> 于2019年7月12日周五 下午12:15写道:

> Hi Caizhi,
>
> 謝謝你的回答。你的第三點想法給了我蠻大的啟發,我本來設想的情況是能否避免把全部使用者
> 資料都存放在 state 來解決這個問題,但聽起來這部分是避免不了的。如果我沒有理解錯,你的
> 作法比較像是將全部使用者的排名資訊都存放在 state,在使用了 rocksdb state backend 的狀況
> 下,這些資料都不需要在記憶體中實體化,所以不會受限記憶體的擴展性,只有那些需要被放入
> Top-N 的資料會被讀取出來存放在一個 in-memory 的堆中做為加速運算的優化。
>
> 在我們目前的應用場景中,精確排名不是必要的資訊,可能還有一些不是硬性的需求來鬆綁這個
> 問題的限制,雖然沒有很有把握但或許可以根據你的想法實現一個專門針對我們應用情境的優化。
>
> 撇開上述特殊的情況,我另外好奇的是第一點中維護的 map state 要記錄精確的排名這件事的細
> 節,想知道如果更新是循序變化的,如果添加了一筆新的紀錄,可能會導致多個紀錄的排名需要
> 加一或是減一,這部分是不是也需要遍歷整個 map 去判斷是否有增減,針對變動的部分通知下
> 游?
>
> Best Regards,
> Tony Wei
>
> Caizhi Weng <[hidden email]> 於 2019年7月12日 週五 上午11:36寫道:
>
> > Hi Tony!
> >
> > 其实 Flink 对 Top-N 问题并没有很 fancy 的实现... Flink 把 Top-N 问题分成三种情况:
> >
> > 1. 数据只添加,不更新不删除(就像 batch mode)
> > 这种情况的实现是 AppendOnlyTopNFunction,就像你说的一样使用一个 Map
> > 来维护。不能直接使用堆来维护的原因是:因为要告知下游每一条记录的精确排名。
> >
> > 2. 数据可能有添加和更新
> > 这种情况的实现是 UpdatableTopNFunction,但是这个类开头的注释表明了它只能用于以下特殊情况:
> >     * 数据更新后排名只能变小不能变大;
> >     * 数据的 sort key 要 unique;
> >     * 不能删数据或者撤回数据。
> > 这种情况就避免了你上面说的排名变大,导致掉出 Top-N 的情况。还是可以用一个 Map 来维护。
> >
> > 3. 数据可以添加、更新和删除
> > 这种情况的实现是 RetractableTopNFunction。因为数据更新 / 删除后可能会掉出 Top-N,要找新数据补进来,那么只能从
> > state 里捞应该补进来的数据。当前由于社区没有 SortedMapState 的实现,现在是用
> ValueState<SortedMap<>> 存
> > state。每次读 state 都是把整个 state 拿出来读的,所以数据量大了其实没办法用... 等社区引入了 SortedMapState
> > 以后,就可以用 iterator 只读取前面一些我们想要补进来的数据。
> >
> > Tony Wei <[hidden email]> 于2019年7月11日周四 上午9:49写道:
> >
> > > Hi,
> > >
> > > 最近正在研究 Top K 的問題,在研究中找到了 Blink SQL 可以透過維護一個儲存 K 的最大紀錄的
> > > ”堆”來優化底下這類 SQL,不過我認為這只能針對 `score` 只會增加不減少的情況。
> > >
> > > > SELECT user_id, score
> > > > FROM (
> > > >   SELECT *,
> > > >     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> > > >   FROM user_scores)
> > > > WHERE row_num <= 3
> > > >
> > > >
> > > 我的問題是當如果這樣的計算是應用在流數據上,且 `score` 可能隨時間增加或是“減少”的話,例
> > > 如底下這類的 SQL,能有什麼樣的優化?
> > >
> > > > SELECT user_id, score
> > > > FROM (
> > > >   SELECT *,
> > > >     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> > > >   FROM (
> > > >       SELECT user_id, LAST_VAL(score) AS score
> > > >       FROM user_scores
> > > >       GROUP BY user_id))
> > > > WHERE row_num <= 3
> > > >
> > > > SQL 中的 `user_scores` 可以當作是從 DataStream 直接轉換過來的 Dynamic Table,
> > > `LAST_VAL`假設是一種 UDAF,可以挑出目前最新的值。所以,可以想像這張 table 的 user's
> > > `score` 是會隨時間變化增減。
> > >
> > > 上面所說堆的優化無法處理這樣的問題,底下舉個例子。假設今天有一個 top-3 的堆中已經存放
> > > 了三個使用者:A, B, C,各自的 scores 是:4, 3, 2,接下來收到了一個使用者 D 和他的分數是
> > > 1 的話,這個時候演算法會直接忽略掉 D,因為他不在 top-3 的範圍內。但是當下一個如果收到
> > > 的是一個更新 A 使用者的 score 為 0 的紀錄的話,這個時候理論上我們知道 top-3 會改為 B, C,
> > > D,但是在維護 top-3 的堆中我們無力找回被忽略的使用者 D。這樣的優化在 batch mode 是沒有
> > > 問題的,因為最新的 score 在有限的數據中會是固定的不動的。
> > >
> > > 不過當處理流數據,我目前只想到這種應用最終可能需要退回成存放全部使用者 scores 才有辦
> > > 法處理,才能隨時計算出正確的 top-k。所以我想請教各位大牛有沒有什麼樣的優化方式可以處
> > > 理這樣的問題,讓狀態不需要存到全部資料?當然這個問題不侷限在 SQL,如果有任何實作在
> > > DataStream 上的優化都是可接受。感謝大家幫忙。
> > >
> > > Best Regards,
> > > Tony Wei
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 關於如何在流數據上計算 Top K 的應用問題

Tony Wei
Hi Caizhi,

非常感謝你提供的資訊和講解,這對我幫助非常大。我會試著把這些知識應用到我們的案例中。

Best Regards,
Tony Wei

Caizhi Weng <[hidden email]> 於 2019年7月12日 週五 下午12:31寫道:

> Hi Tony!
>
> 這些資料都不需要在記憶體中實體化,所以不會受限記憶體的擴展性,只有那些需要被放入 Top-N 的資料會被讀取出來存放在一個 in-memory
> > 的堆中做為加速運算的優化。
>
>
> 前两种情况下,由于不需要从老数据中捞记录回 Top-N,state 里其实也只要放 Top-N 的数据。Top-N
> 的数据先在内存里维护,checkpoint 的时候同步到 state。
>
> 第三种情况下,你的说法是社区引入 SortedMapState 后可以实现的情况,现在由于暂时没有引入 SortedMapState,每次读
> state 还是会把所有数据都读出来(具体来说有两个 state,一个 ValueState 有序地存储了每个 sort key 的记录数量,另一个
> MapState 无序地根据 sort key 取出具体的记录列表),所以其实当前实现会受内存限制... 还是要等引入 SortedMapState
> 就可以不受内存限制了,但是每次从 state 里取数据仍然很耗时。
>
> 如果添加了一筆新的紀錄,可能會導致多個紀錄的排名需要加一或是減一,這部分是不是也需要遍歷整個 map 去判斷是否有增減,針對變動的部分通知下游?
>
>
> 是的,Flink 现在会把每一条有排名变化的数据通知下游(具体实现可参考 AppendOnlyTopNFunction 的
> processElementWithRowNumber 方法)。这个主要是为了 row_number()
> 等需要精确知道排名的情况准备的。如果只是单纯的 limit 3 这样的情况,只会通知下游进入和离开 Top-N
> 的那两条数据(processElementWithoutRowNumber 方法)。
>
> Tony Wei <[hidden email]> 于2019年7月12日周五 下午12:15写道:
>
> > Hi Caizhi,
> >
> > 謝謝你的回答。你的第三點想法給了我蠻大的啟發,我本來設想的情況是能否避免把全部使用者
> > 資料都存放在 state 來解決這個問題,但聽起來這部分是避免不了的。如果我沒有理解錯,你的
> > 作法比較像是將全部使用者的排名資訊都存放在 state,在使用了 rocksdb state backend 的狀況
> > 下,這些資料都不需要在記憶體中實體化,所以不會受限記憶體的擴展性,只有那些需要被放入
> > Top-N 的資料會被讀取出來存放在一個 in-memory 的堆中做為加速運算的優化。
> >
> > 在我們目前的應用場景中,精確排名不是必要的資訊,可能還有一些不是硬性的需求來鬆綁這個
> > 問題的限制,雖然沒有很有把握但或許可以根據你的想法實現一個專門針對我們應用情境的優化。
> >
> > 撇開上述特殊的情況,我另外好奇的是第一點中維護的 map state 要記錄精確的排名這件事的細
> > 節,想知道如果更新是循序變化的,如果添加了一筆新的紀錄,可能會導致多個紀錄的排名需要
> > 加一或是減一,這部分是不是也需要遍歷整個 map 去判斷是否有增減,針對變動的部分通知下
> > 游?
> >
> > Best Regards,
> > Tony Wei
> >
> > Caizhi Weng <[hidden email]> 於 2019年7月12日 週五 上午11:36寫道:
> >
> > > Hi Tony!
> > >
> > > 其实 Flink 对 Top-N 问题并没有很 fancy 的实现... Flink 把 Top-N 问题分成三种情况:
> > >
> > > 1. 数据只添加,不更新不删除(就像 batch mode)
> > > 这种情况的实现是 AppendOnlyTopNFunction,就像你说的一样使用一个 Map
> > > 来维护。不能直接使用堆来维护的原因是:因为要告知下游每一条记录的精确排名。
> > >
> > > 2. 数据可能有添加和更新
> > > 这种情况的实现是 UpdatableTopNFunction,但是这个类开头的注释表明了它只能用于以下特殊情况:
> > >     * 数据更新后排名只能变小不能变大;
> > >     * 数据的 sort key 要 unique;
> > >     * 不能删数据或者撤回数据。
> > > 这种情况就避免了你上面说的排名变大,导致掉出 Top-N 的情况。还是可以用一个 Map 来维护。
> > >
> > > 3. 数据可以添加、更新和删除
> > > 这种情况的实现是 RetractableTopNFunction。因为数据更新 / 删除后可能会掉出 Top-N,要找新数据补进来,那么只能从
> > > state 里捞应该补进来的数据。当前由于社区没有 SortedMapState 的实现,现在是用
> > ValueState<SortedMap<>> 存
> > > state。每次读 state 都是把整个 state 拿出来读的,所以数据量大了其实没办法用... 等社区引入了
> SortedMapState
> > > 以后,就可以用 iterator 只读取前面一些我们想要补进来的数据。
> > >
> > > Tony Wei <[hidden email]> 于2019年7月11日周四 上午9:49写道:
> > >
> > > > Hi,
> > > >
> > > > 最近正在研究 Top K 的問題,在研究中找到了 Blink SQL 可以透過維護一個儲存 K 的最大紀錄的
> > > > ”堆”來優化底下這類 SQL,不過我認為這只能針對 `score` 只會增加不減少的情況。
> > > >
> > > > > SELECT user_id, score
> > > > > FROM (
> > > > >   SELECT *,
> > > > >     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> > > > >   FROM user_scores)
> > > > > WHERE row_num <= 3
> > > > >
> > > > >
> > > > 我的問題是當如果這樣的計算是應用在流數據上,且 `score` 可能隨時間增加或是“減少”的話,例
> > > > 如底下這類的 SQL,能有什麼樣的優化?
> > > >
> > > > > SELECT user_id, score
> > > > > FROM (
> > > > >   SELECT *,
> > > > >     ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> > > > >   FROM (
> > > > >       SELECT user_id, LAST_VAL(score) AS score
> > > > >       FROM user_scores
> > > > >       GROUP BY user_id))
> > > > > WHERE row_num <= 3
> > > > >
> > > > > SQL 中的 `user_scores` 可以當作是從 DataStream 直接轉換過來的 Dynamic Table,
> > > > `LAST_VAL`假設是一種 UDAF,可以挑出目前最新的值。所以,可以想像這張 table 的 user's
> > > > `score` 是會隨時間變化增減。
> > > >
> > > > 上面所說堆的優化無法處理這樣的問題,底下舉個例子。假設今天有一個 top-3 的堆中已經存放
> > > > 了三個使用者:A, B, C,各自的 scores 是:4, 3, 2,接下來收到了一個使用者 D 和他的分數是
> > > > 1 的話,這個時候演算法會直接忽略掉 D,因為他不在 top-3 的範圍內。但是當下一個如果收到
> > > > 的是一個更新 A 使用者的 score 為 0 的紀錄的話,這個時候理論上我們知道 top-3 會改為 B, C,
> > > > D,但是在維護 top-3 的堆中我們無力找回被忽略的使用者 D。這樣的優化在 batch mode 是沒有
> > > > 問題的,因為最新的 score 在有限的數據中會是固定的不動的。
> > > >
> > > > 不過當處理流數據,我目前只想到這種應用最終可能需要退回成存放全部使用者 scores 才有辦
> > > > 法處理,才能隨時計算出正確的 top-k。所以我想請教各位大牛有沒有什麼樣的優化方式可以處
> > > > 理這樣的問題,讓狀態不需要存到全部資料?當然這個問題不侷限在 SQL,如果有任何實作在
> > > > DataStream 上的優化都是可接受。感謝大家幫忙。
> > > >
> > > > Best Regards,
> > > > Tony Wei
> > > >
> > >
> >
>