关于使用Flink计算TopN的问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

关于使用Flink计算TopN的问题

董 加强
大家好:

最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式

create stream input table raw_log (
  country STRING,
  domain STRING,
  flux LONG,
  request LONG,
  rowtime AS ROWTIME(request, "2 SECOND")
) USING kafka (
  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
  startingOffsets = earliest, subscribe = "input"
) ROW FORMAT JSON; create stream output table top_n_result USING kafka (
  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
  topic = "output"
) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log as
select
  TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart,
  country,
  domain,
  sum(flux) as flux
from
  raw_log
group by
  TUMBLE(rowtime, INTERVAL '2' SECOND),
  country,
  domain; insert into top_n_result
SELECT
  *
FROM
  (
    SELECT
      *,
      ROW_NUMBER() OVER (
        PARTITION BY wStart
        ORDER BY
          flux desc
      ) AS row_num
    FROM
      window_log
  )
WHERE
  row_num <= 10;

        就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了

一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在

AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?
Reply | Threaded
Open this post in threaded view
|

Re: 关于使用Flink计算TopN的问题

Jark
Administrator
Hi,

多谢反馈。 这应该是一个 mistake。 我创建了一个 issue 去跟踪这个问题。 https://issues.apache.org/jira/browse/FLINK-14119 <https://issues.apache.org/jira/browse/FLINK-14119>

> 在 2019年9月18日,16:58,董 加强 <[hidden email]> 写道:
>
> 大家好:
>
> 最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式
>
> create stream input table raw_log (
>  country STRING,
>  domain STRING,
>  flux LONG,
>  request LONG,
>  rowtime AS ROWTIME(request, "2 SECOND")
> ) USING kafka (
>  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
>  startingOffsets = earliest, subscribe = "input"
> ) ROW FORMAT JSON; create stream output table top_n_result USING kafka (
>  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
>  topic = "output"
> ) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log as
> select
>  TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart,
>  country,
>  domain,
>  sum(flux) as flux
> from
>  raw_log
> group by
>  TUMBLE(rowtime, INTERVAL '2' SECOND),
>  country,
>  domain; insert into top_n_result
> SELECT
>  *
> FROM
>  (
>    SELECT
>      *,
>      ROW_NUMBER() OVER (
>        PARTITION BY wStart
>        ORDER BY
>          flux desc
>      ) AS row_num
>    FROM
>      window_log
>  )
> WHERE
>  row_num <= 10;
>
>        就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了
>
> 一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在
>
> AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?