大家好:
最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式 create stream input table raw_log ( country STRING, domain STRING, flux LONG, request LONG, rowtime AS ROWTIME(request, "2 SECOND") ) USING kafka ( kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}", startingOffsets = earliest, subscribe = "input" ) ROW FORMAT JSON; create stream output table top_n_result USING kafka ( kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}", topic = "output" ) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log as select TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart, country, domain, sum(flux) as flux from raw_log group by TUMBLE(rowtime, INTERVAL '2' SECOND), country, domain; insert into top_n_result SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY wStart ORDER BY flux desc ) AS row_num FROM window_log ) WHERE row_num <= 10; 就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了 一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在 AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除? |
Administrator
|
Hi,
多谢反馈。 这应该是一个 mistake。 我创建了一个 issue 去跟踪这个问题。 https://issues.apache.org/jira/browse/FLINK-14119 <https://issues.apache.org/jira/browse/FLINK-14119> > 在 2019年9月18日,16:58,董 加强 <[hidden email]> 写道: > > 大家好: > > 最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式 > > create stream input table raw_log ( > country STRING, > domain STRING, > flux LONG, > request LONG, > rowtime AS ROWTIME(request, "2 SECOND") > ) USING kafka ( > kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}", > startingOffsets = earliest, subscribe = "input" > ) ROW FORMAT JSON; create stream output table top_n_result USING kafka ( > kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}", > topic = "output" > ) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log as > select > TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart, > country, > domain, > sum(flux) as flux > from > raw_log > group by > TUMBLE(rowtime, INTERVAL '2' SECOND), > country, > domain; insert into top_n_result > SELECT > * > FROM > ( > SELECT > *, > ROW_NUMBER() OVER ( > PARTITION BY wStart > ORDER BY > flux desc > ) AS row_num > FROM > window_log > ) > WHERE > row_num <= 10; > > 就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了 > > 一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在 > > AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除? |
Free forum by Nabble | Edit this page |