回复: 求助:FLINKSQL1.10实时统计累计UV

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

回复: 求助:FLINKSQL1.10实时统计累计UV

x_gothicist
版本是1.10.1,最后sink的时候确实是一个window里面做count distinct操作。请问是只要计算过程中含有一个window里面做count distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,group DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这个sql对应的状态很大。代码如下:
val rt_totaluv_view : Table = tabEnv.sqlQuery(
  """
    SELECT MAX(DATE_FORMAT(rowtm, 'yyyy-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv
    FROM source
    GROUP BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
    """)
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)

val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
  .filter( line => line._1 == true ).map( line => line._2 )

val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )

tabEnv.sqlUpdate(
  s"""
    INSERT INTO mysql_totaluv
    SELECT _1,MAX(_2)
    FROM $totaluvTabTmp
    GROUP BY _1
    """)
------------------ 原始邮件 ------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年7月3日(星期五) 晚上9:47
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV



你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
这个已经在1.11中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-17942

x <[hidden email]&gt; 于2020年7月3日周五 下午4:34写道:

&gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
&gt;
&gt; 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月18日(星期四) 中午12:16
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt;
&gt;
&gt;
&gt; 是的,我觉得这样子是能绕过的。
&gt;
&gt; On Thu, 18 Jun 2020 at 10:34, x <[hidden email]&amp;gt; wrote:
&gt;
&gt; &amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
&gt; &amp;gt; val resTmpTab: Table = tabEnv.sqlQuery(
&gt; &amp;gt;&amp;nbsp;&amp;nbsp; """
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
&gt; HH:mm:00'))
&gt; &amp;gt; time_str,COUNT(DISTINCT userkey) uv
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; FROM user_behavior&amp;nbsp;&amp;nbsp;&amp;nbsp; GROUP BY
&gt; DATE_FORMAT(ts, 'yyyy-MM-dd')&amp;nbsp;&amp;nbsp;&amp;nbsp; """)
&gt; &amp;gt;
&gt; &amp;gt; val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;
&gt; .filter(line=&amp;amp;gt;line._1==true).map(line=&amp;amp;gt;line._2)
&gt; &amp;gt;
&gt; &amp;gt; val res= tabEnv.fromDataStream(resTmpStream)
&gt; &amp;gt; tabEnv.sqlUpdate(
&gt; &amp;gt;&amp;nbsp;&amp;nbsp; s"""
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; INSERT INTO rt_totaluv
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; SELECT _1,MAX(_2)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; FROM $res
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; GROUP BY _1
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; """)
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&gt; &amp;gt; 发件人:&amp;amp;nbsp;"Jark Wu"<[hidden email]&amp;amp;gt;;
&gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年6月17日(星期三) 中午1:55
&gt; &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;;
&gt; &amp;gt;
&gt; &amp;gt; 主题:&amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 在 Flink 1.11 中,你可以尝试这样:
&gt; &amp;gt;
&gt; &amp;gt; CREATE TABLE mysql (
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; time_str STRING,
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; uv BIGINT,
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; PRIMARY KEY (ts) NOT ENFORCED
&gt; &amp;gt; ) WITH (
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; 'connector' = 'jdbc',
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; 'table-name' = 'myuv'
&gt; &amp;gt; );
&gt; &amp;gt;
&gt; &amp;gt; INSERT INTO mysql
&gt; &amp;gt; SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')),
&gt; COUNT(DISTINCT&amp;amp;nbsp;
&gt; &amp;gt; user_id)
&gt; &amp;gt; FROM user_behavior;
&gt; &amp;gt;
&gt; &amp;gt; On Wed, 17 Jun 2020 at 13:49, x <[hidden email]&amp;amp;gt; wrote:
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
&gt; &amp;gt; &amp;amp;gt; sink表这个样式
&gt; &amp;gt; &amp;amp;gt; tm uv
&gt; &amp;gt; &amp;amp;gt; 2020/06/17 13:46:00 10000
&gt; &amp;gt; &amp;amp;gt; 2020/06/17 13:47:00 20000
&gt; &amp;gt; &amp;amp;gt; 2020/06/17 13:48:00 30000
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; group by 日期的话,分钟如何获取
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
&gt; &amp;gt; &amp;amp;gt; 发件人:&amp;amp;amp;nbsp;"Benchao Li"<[hidden email]
&gt; &amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; 发送时间:&amp;amp;amp;nbsp;2020年6月17日(星期三) 中午11:46
&gt; &amp;gt; &amp;amp;gt; 收件人:&amp;amp;amp;nbsp;"user-zh"<[hidden email]
&gt; &amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; 主题:&amp;amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; Hi,
&gt; &amp;gt; &amp;amp;gt; 我感觉这种场景可以有两种方式,
&gt; &amp;gt; &amp;amp;gt; 1. 可以直接用group by + mini batch
&gt; &amp;gt; &amp;amp;gt; 2. window聚合 + fast emit
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm,
&gt; 'yyyy-MM-dd')。
&gt; &amp;gt; &amp;amp;gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini
&gt; batch的开启也需要
&gt; &amp;gt; &amp;amp;gt; 用参数[2] 来打开。
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
&gt; &amp;gt; &amp;amp;gt; fast
&gt; emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
&gt; &amp;gt; &amp;amp;gt; table.exec.emit.early-fire.enabled = true
&gt; &amp;gt; &amp;amp;gt; table.exec.emit.early-fire.delay = 60 s
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; [1]
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
&gt; &amp;gt; &amp;amp;gt; [2]
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; x <[hidden email]&amp;amp;amp;gt; 于2020年6月17日周三 上午11:14写道:
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; CREATE VIEW uv_per_10min AS
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; SELECT&amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp;
&gt; MAX(DATE_FORMAT(proctime&amp;amp;amp;amp;nbsp;,
&gt; &amp;gt; 'yyyy-MM-dd
&gt; &amp;gt; &amp;amp;gt; HH:mm:00'))&amp;amp;amp;amp;nbsp;OVER w
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; AS time_str,&amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp; COUNT(DISTINCT user_id) OVER
&gt; w AS uv
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; FROM user_behavior
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN
&gt; UNBOUNDED
&gt; &amp;gt; PRECEDING AND
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; CURRENT ROW);
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 想请教一下,应该如何处理?
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
&gt; &amp;gt; 这样可以吗,另外状态应该如何清理?
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 多谢



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 求助:FLINKSQL1.10实时统计累计UV

Benchao Li-2
我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

x <[hidden email]> 于2020年7月6日周一 上午11:15写道:

> 版本是1.10.1,最后sink的时候确实是一个window里面做count
> distinct操作。请问是只要计算过程中含有一个window里面做count
> distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,group&nbsp;DATE_FORMAT(rowtm,
> 'yyyy-MM-dd') 这个sql对应的状态很大。代码如下:
> val rt_totaluv_view : Table = tabEnv.sqlQuery(
>   """
>     SELECT MAX(DATE_FORMAT(rowtm, 'yyyy-MM-dd HH:mm:00'))
> time_str,COUNT(DISTINCT userkey) uv
>     FROM source
>     GROUP BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
>     """)
> tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
>
> val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
>   .filter( line =&gt; line._1 == true ).map( line =&gt; line._2 )
>
> val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
>
> tabEnv.sqlUpdate(
>   s"""
>     INSERT INTO mysql_totaluv
>     SELECT _1,MAX(_2)
>     FROM $totaluvTabTmp
>     GROUP BY _1
>     """)
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年7月3日(星期五) 晚上9:47
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
> 这个已经在1.11中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17942
>
> x <[hidden email]&gt; 于2020年7月3日周五 下午4:34写道:
>
> &gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
> &gt;
> &gt;
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年6月18日(星期四) 中午12:16
> &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> &gt;
> &gt;
> &gt;
> &gt; 是的,我觉得这样子是能绕过的。
> &gt;
> &gt; On Thu, 18 Jun 2020 at 10:34, x <[hidden email]&amp;gt; wrote:
> &gt;
> &gt; &amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
> &gt; &amp;gt; val resTmpTab: Table = tabEnv.sqlQuery(
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp; """
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; SELECT
> MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
> &gt; HH:mm:00'))
> &gt; &amp;gt; time_str,COUNT(DISTINCT userkey) uv
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; FROM
> user_behavior&amp;nbsp;&amp;nbsp;&amp;nbsp; GROUP BY
> &gt; DATE_FORMAT(ts, 'yyyy-MM-dd')&amp;nbsp;&amp;nbsp;&amp;nbsp; """)
> &gt; &amp;gt;
> &gt; &amp;gt; val
> resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;
> &gt; .filter(line=&amp;amp;gt;line._1==true).map(line=&amp;amp;gt;line._2)
> &gt; &amp;gt;
> &gt; &amp;gt; val res= tabEnv.fromDataStream(resTmpStream)
> &gt; &amp;gt; tabEnv.sqlUpdate(
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp; s"""
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; INSERT INTO
> rt_totaluv
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; SELECT _1,MAX(_2)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; FROM $res
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; GROUP BY _1
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; """)
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
> &gt; &amp;gt; 发件人:&amp;amp;nbsp;"Jark Wu"<[hidden email]&amp;amp;gt;;
> &gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年6月17日(星期三) 中午1:55
> &gt; &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]
> &amp;amp;gt;;
> &gt; &amp;gt;
> &gt; &amp;gt; 主题:&amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; 在 Flink 1.11 中,你可以尝试这样:
> &gt; &amp;gt;
> &gt; &amp;gt; CREATE TABLE mysql (
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; time_str STRING,
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; uv BIGINT,
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; PRIMARY KEY (ts) NOT ENFORCED
> &gt; &amp;gt; ) WITH (
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; 'connector' = 'jdbc',
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; 'url' =
> 'jdbc:mysql://localhost:3306/mydatabase',
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; 'table-name' = 'myuv'
> &gt; &amp;gt; );
> &gt; &amp;gt;
> &gt; &amp;gt; INSERT INTO mysql
> &gt; &amp;gt; SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')),
> &gt; COUNT(DISTINCT&amp;amp;nbsp;
> &gt; &amp;gt; user_id)
> &gt; &amp;gt; FROM user_behavior;
> &gt; &amp;gt;
> &gt; &amp;gt; On Wed, 17 Jun 2020 at 13:49, x <[hidden email]&amp;amp;gt;
> wrote:
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> &gt; &amp;gt; &amp;amp;gt; sink表这个样式
> &gt; &amp;gt; &amp;amp;gt; tm uv
> &gt; &amp;gt; &amp;amp;gt; 2020/06/17 13:46:00 10000
> &gt; &amp;gt; &amp;amp;gt; 2020/06/17 13:47:00 20000
> &gt; &amp;gt; &amp;amp;gt; 2020/06/17 13:48:00 30000
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; group by 日期的话,分钟如何获取
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt;
> ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
> &gt; &amp;gt; &amp;amp;gt; 发件人:&amp;amp;amp;nbsp;"Benchao Li"<
> [hidden email]
> &gt; &amp;amp;amp;gt;;
> &gt; &amp;gt; &amp;amp;gt; 发送时间:&amp;amp;amp;nbsp;2020年6月17日(星期三) 中午11:46
> &gt; &amp;gt; &amp;amp;gt; 收件人:&amp;amp;amp;nbsp;"user-zh"<
> [hidden email]
> &gt; &amp;amp;amp;gt;;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 主题:&amp;amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; Hi,
> &gt; &amp;gt; &amp;amp;gt; 我感觉这种场景可以有两种方式,
> &gt; &amp;gt; &amp;amp;gt; 1. 可以直接用group by + mini batch
> &gt; &amp;gt; &amp;amp;gt; 2. window聚合 + fast emit
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 对于#1,group
> by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm,
> &gt; 'yyyy-MM-dd')。
> &gt; &amp;gt; &amp;amp;gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1]
> 。同时,mini
> &gt; batch的开启也需要
> &gt; &amp;gt; &amp;amp;gt; 用参数[2] 来打开。
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> &gt; &amp;gt; &amp;amp;gt; fast
> &gt; emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> &gt; &amp;gt; &amp;amp;gt; table.exec.emit.early-fire.enabled = true
> &gt; &amp;gt; &amp;amp;gt; table.exec.emit.early-fire.delay = 60 s
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; [1]
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt;
> &gt;
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> &gt; &amp;gt; &amp;amp;gt; [2]
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt;
> &gt;
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; x <[hidden email]&amp;amp;amp;gt;
> 于2020年6月17日周三 上午11:14写道:
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; CREATE VIEW uv_per_10min AS
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; SELECT&amp;amp;amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp;
> &gt; MAX(DATE_FORMAT(proctime&amp;amp;amp;amp;nbsp;,
> &gt; &amp;gt; 'yyyy-MM-dd
> &gt; &amp;gt; &amp;amp;gt; HH:mm:00'))&amp;amp;amp;amp;nbsp;OVER w
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; AS
> time_str,&amp;amp;amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp;
> COUNT(DISTINCT user_id) OVER
> &gt; w AS uv
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; FROM user_behavior
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; WINDOW w AS (ORDER BY proctime
> ROWS BETWEEN
> &gt; UNBOUNDED
> &gt; &amp;gt; PRECEDING AND
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; CURRENT ROW);
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 想请教一下,应该如何处理?
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; PARTITION BY
> DATE_FORMAT(rowtm, 'yyyy-MM-dd')
> &gt; &amp;gt; 这样可以吗,另外状态应该如何清理?
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE
> VIEW吧
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 多谢
>
>
>
> --
>
> Best,
> Benchao Li



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复: 求助:FLINKSQL1.10实时统计累计UV

x_gothicist
sorry,我说错了,确实没有,都是group agg.
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.


------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年7月6日(星期一) 中午12:52
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV



我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

x <[hidden email]&gt; 于2020年7月6日周一 上午11:15写道:

&gt; 版本是1.10.1,最后sink的时候确实是一个window里面做count
&gt; distinct操作。请问是只要计算过程中含有一个window里面做count
&gt; distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,group&amp;nbsp;DATE_FORMAT(rowtm,
&gt; 'yyyy-MM-dd') 这个sql对应的状态很大。代码如下:
&gt; val rt_totaluv_view : Table = tabEnv.sqlQuery(
&gt;&nbsp;&nbsp; """
&gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT MAX(DATE_FORMAT(rowtm, 'yyyy-MM-dd HH:mm:00'))
&gt; time_str,COUNT(DISTINCT userkey) uv
&gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM source
&gt;&nbsp;&nbsp;&nbsp;&nbsp; GROUP BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
&gt;&nbsp;&nbsp;&nbsp;&nbsp; """)
&gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
&gt;
&gt; val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
&gt;&nbsp;&nbsp; .filter( line =&amp;gt; line._1 == true ).map( line =&amp;gt; line._2 )
&gt;
&gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
&gt;
&gt; tabEnv.sqlUpdate(
&gt;&nbsp;&nbsp; s"""
&gt;&nbsp;&nbsp;&nbsp;&nbsp; INSERT INTO mysql_totaluv
&gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT _1,MAX(_2)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM $totaluvTabTmp
&gt;&nbsp;&nbsp;&nbsp;&nbsp; GROUP BY _1
&gt;&nbsp;&nbsp;&nbsp;&nbsp; """)
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年7月3日(星期五) 晚上9:47
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt;
&gt;
&gt;
&gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
&gt; 这个已经在1.11中修复了。
&gt;
&gt; [1] https://issues.apache.org/jira/browse/FLINK-17942
&gt;
&gt; x <[hidden email]&amp;gt; 于2020年7月3日周五 下午4:34写道:
&gt;
&gt; &amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
&gt; &amp;gt;
&gt; &amp;gt;
&gt; 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&gt; &amp;gt; 发件人:&amp;amp;nbsp;"Jark Wu"<[hidden email]&amp;amp;gt;;
&gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年6月18日(星期四) 中午12:16
&gt; &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;;
&gt; &amp;gt;
&gt; &amp;gt; 主题:&amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 是的,我觉得这样子是能绕过的。
&gt; &amp;gt;
&gt; &amp;gt; On Thu, 18 Jun 2020 at 10:34, x <[hidden email]&amp;amp;gt; wrote:
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
&gt; &amp;gt; &amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery(
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp; """
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; SELECT
&gt; MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
&gt; &amp;gt; HH:mm:00'))
&gt; &amp;gt; &amp;amp;gt; time_str,COUNT(DISTINCT userkey) uv
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FROM
&gt; user_behavior&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; GROUP BY
&gt; &amp;gt; DATE_FORMAT(ts, 'yyyy-MM-dd')&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; """)
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; val
&gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; .filter(line=&amp;amp;amp;gt;line._1==true).map(line=&amp;amp;amp;gt;line._2)
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream)
&gt; &amp;gt; &amp;amp;gt; tabEnv.sqlUpdate(
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp; s"""
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; INSERT INTO
&gt; rt_totaluv
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; SELECT _1,MAX(_2)
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FROM $res
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; GROUP BY _1
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; """)
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
&gt; &amp;gt; &amp;amp;gt; 发件人:&amp;amp;amp;nbsp;"Jark Wu"<[hidden email]&amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; 发送时间:&amp;amp;amp;nbsp;2020年6月17日(星期三) 中午1:55
&gt; &amp;gt; &amp;amp;gt; 收件人:&amp;amp;amp;nbsp;"user-zh"<[hidden email]
&gt; &amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; 主题:&amp;amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; 在 Flink 1.11 中,你可以尝试这样:
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; CREATE TABLE mysql (
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; time_str STRING,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; uv BIGINT,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; PRIMARY KEY (ts) NOT ENFORCED
&gt; &amp;gt; &amp;amp;gt; ) WITH (
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 'connector' = 'jdbc',
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 'url' =
&gt; 'jdbc:mysql://localhost:3306/mydatabase',
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 'table-name' = 'myuv'
&gt; &amp;gt; &amp;amp;gt; );
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; INSERT INTO mysql
&gt; &amp;gt; &amp;amp;gt; SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')),
&gt; &amp;gt; COUNT(DISTINCT&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; user_id)
&gt; &amp;gt; &amp;amp;gt; FROM user_behavior;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; On Wed, 17 Jun 2020 at 13:49, x <[hidden email]&amp;amp;amp;gt;
&gt; wrote:
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; sink表这个样式
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; tm uv
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:46:00 10000
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:47:00 20000
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:48:00 30000
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; group by 日期的话,分钟如何获取
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt;
&gt; ------------------&amp;amp;amp;amp;nbsp;原始邮件&amp;amp;amp;amp;nbsp;------------------
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 发件人:&amp;amp;amp;amp;nbsp;"Benchao Li"<
&gt; [hidden email]
&gt; &amp;gt; &amp;amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 发送时间:&amp;amp;amp;amp;nbsp;2020年6月17日(星期三) 中午11:46
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 收件人:&amp;amp;amp;amp;nbsp;"user-zh"<
&gt; [hidden email]
&gt; &amp;gt; &amp;amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 主题:&amp;amp;amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; Hi,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 我感觉这种场景可以有两种方式,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 1. 可以直接用group by + mini batch
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2. window聚合 + fast emit
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 对于#1,group
&gt; by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm,
&gt; &amp;gt; 'yyyy-MM-dd')。
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1]
&gt; 。同时,mini
&gt; &amp;gt; batch的开启也需要
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 用参数[2] 来打开。
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; fast
&gt; &amp;gt; emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; table.exec.emit.early-fire.enabled = true
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; table.exec.emit.early-fire.delay = 60 s
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; [1]
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; [2]
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; x <[hidden email]&amp;amp;amp;amp;gt;
&gt; 于2020年6月17日周三 上午11:14写道:
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; CREATE VIEW uv_per_10min AS
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; SELECT&amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; MAX(DATE_FORMAT(proctime&amp;amp;amp;amp;amp;nbsp;,
&gt; &amp;gt; &amp;amp;gt; 'yyyy-MM-dd
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; HH:mm:00'))&amp;amp;amp;amp;amp;nbsp;OVER w
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; AS
&gt; time_str,&amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;nbsp;
&gt; COUNT(DISTINCT user_id) OVER
&gt; &amp;gt; w AS uv
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; FROM user_behavior
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; WINDOW w AS (ORDER BY proctime
&gt; ROWS BETWEEN
&gt; &amp;gt; UNBOUNDED
&gt; &amp;gt; &amp;amp;gt; PRECEDING AND
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; CURRENT ROW);
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 想请教一下,应该如何处理?
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; PARTITION BY
&gt; DATE_FORMAT(rowtm, 'yyyy-MM-dd')
&gt; &amp;gt; &amp;amp;gt; 这样可以吗,另外状态应该如何清理?
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE
&gt; VIEW吧
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 多谢
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 求助:FLINKSQL1.10实时统计累计UV

Benchao Li-2
感觉不太应该有这种情况,你用的是blink planner么?

x <[hidden email]> 于2020年7月6日周一 下午1:24写道:

> sorry,我说错了,确实没有,都是group agg.
>
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年7月6日(星期一) 中午12:52
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
> 这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> x <[hidden email]&gt; 于2020年7月6日周一 上午11:15写道:
>
> &gt; 版本是1.10.1,最后sink的时候确实是一个window里面做count
> &gt; distinct操作。请问是只要计算过程中含有一个window里面做count
> &gt;
> distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,group&amp;nbsp;DATE_FORMAT(rowtm,
> &gt; 'yyyy-MM-dd') 这个sql对应的状态很大。代码如下:
> &gt; val rt_totaluv_view : Table = tabEnv.sqlQuery(
> &gt;&nbsp;&nbsp; """
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT MAX(DATE_FORMAT(rowtm, 'yyyy-MM-dd
> HH:mm:00'))
> &gt; time_str,COUNT(DISTINCT userkey) uv
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM source
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; GROUP BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; """)
> &gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
> &gt;
> &gt; val totaluvTmp =
> tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
> &gt;&nbsp;&nbsp; .filter( line =&amp;gt; line._1 == true ).map( line
> =&amp;gt; line._2 )
> &gt;
> &gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
> &gt;
> &gt; tabEnv.sqlUpdate(
> &gt;&nbsp;&nbsp; s"""
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; INSERT INTO mysql_totaluv
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT _1,MAX(_2)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM $totaluvTabTmp
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; GROUP BY _1
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; """)
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年7月3日(星期五) 晚上9:47
> &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> &gt;
> &gt;
> &gt;
> &gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
> &gt; 这个已经在1.11中修复了。
> &gt;
> &gt; [1] https://issues.apache.org/jira/browse/FLINK-17942
> &gt;
> &gt; x <[hidden email]&amp;gt; 于2020年7月3日周五 下午4:34写道:
> &gt;
> &gt; &amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt;
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
> &gt; &amp;gt; 发件人:&amp;amp;nbsp;"Jark Wu"<[hidden email]&amp;amp;gt;;
> &gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年6月18日(星期四) 中午12:16
> &gt; &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]
> &amp;amp;gt;;
> &gt; &amp;gt;
> &gt; &amp;gt; 主题:&amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; 是的,我觉得这样子是能绕过的。
> &gt; &amp;gt;
> &gt; &amp;gt; On Thu, 18 Jun 2020 at 10:34, x <[hidden email]&amp;amp;gt;
> wrote:
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
> &gt; &amp;gt; &amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery(
> &gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp; """
> &gt; &amp;gt;
> &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; SELECT
> &gt; MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
> &gt; &amp;gt; HH:mm:00'))
> &gt; &amp;gt; &amp;amp;gt; time_str,COUNT(DISTINCT userkey) uv
> &gt; &amp;gt;
> &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FROM
> &gt; user_behavior&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; GROUP BY
> &gt; &amp;gt; DATE_FORMAT(ts,
> 'yyyy-MM-dd')&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; """)
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; val
> &gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
> &gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;
> &gt; &amp;gt;
> .filter(line=&amp;amp;amp;gt;line._1==true).map(line=&amp;amp;amp;gt;line._2)
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream)
> &gt; &amp;gt; &amp;amp;gt; tabEnv.sqlUpdate(
> &gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp; s"""
> &gt; &amp;gt;
> &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; INSERT
> INTO
> &gt; rt_totaluv
> &gt; &amp;gt;
> &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; SELECT
> _1,MAX(_2)
> &gt; &amp;gt;
> &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FROM
> $res
> &gt; &amp;gt;
> &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; GROUP
> BY _1
> &gt; &amp;gt;
> &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; """)
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt;
> ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
> &gt; &amp;gt; &amp;amp;gt; 发件人:&amp;amp;amp;nbsp;"Jark Wu"<
> [hidden email]&amp;amp;amp;gt;;
> &gt; &amp;gt; &amp;amp;gt; 发送时间:&amp;amp;amp;nbsp;2020年6月17日(星期三) 中午1:55
> &gt; &amp;gt; &amp;amp;gt; 收件人:&amp;amp;amp;nbsp;"user-zh"<
> [hidden email]
> &gt; &amp;amp;amp;gt;;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 主题:&amp;amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 在 Flink 1.11 中,你可以尝试这样:
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; CREATE TABLE mysql (
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; time_str
> STRING,
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; uv BIGINT,
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; PRIMARY
> KEY (ts) NOT ENFORCED
> &gt; &amp;gt; &amp;amp;gt; ) WITH (
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 'connector' = 'jdbc',
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 'url' =
> &gt; 'jdbc:mysql://localhost:3306/mydatabase',
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 'table-name' = 'myuv'
> &gt; &amp;gt; &amp;amp;gt; );
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; INSERT INTO mysql
> &gt; &amp;gt; &amp;amp;gt; SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
> HH:mm:00')),
> &gt; &amp;gt; COUNT(DISTINCT&amp;amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;gt; user_id)
> &gt; &amp;gt; &amp;amp;gt; FROM user_behavior;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; On Wed, 17 Jun 2020 at 13:49, x <
> [hidden email]&amp;amp;amp;gt;
> &gt; wrote:
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; sink表这个样式
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; tm uv
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:46:00 10000
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:47:00 20000
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:48:00 30000
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; group by 日期的话,分钟如何获取
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt;
> &gt;
> ------------------&amp;amp;amp;amp;nbsp;原始邮件&amp;amp;amp;amp;nbsp;------------------
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> 发件人:&amp;amp;amp;amp;nbsp;"Benchao Li"<
> &gt; [hidden email]
> &gt; &amp;gt; &amp;amp;amp;amp;gt;;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> 发送时间:&amp;amp;amp;amp;nbsp;2020年6月17日(星期三) 中午11:46
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> 收件人:&amp;amp;amp;amp;nbsp;"user-zh"<
> &gt; [hidden email]
> &gt; &amp;gt; &amp;amp;amp;amp;gt;;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 主题:&amp;amp;amp;amp;nbsp;Re:
> 求助:FLINKSQL1.10实时统计累计UV
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; Hi,
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 我感觉这种场景可以有两种方式,
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 1. 可以直接用group by + mini batch
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2. window聚合 + fast emit
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 对于#1,group
> &gt; by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm,
> &gt; &amp;gt; 'yyyy-MM-dd')。
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 这种情况下的状态清理,需要配置state
> retention时间,配置方法可以参考[1]
> &gt; 。同时,mini
> &gt; &amp;gt; batch的开启也需要
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 用参数[2] 来打开。
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; fast
> &gt; &amp;gt;
> emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> table.exec.emit.early-fire.enabled = true
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> table.exec.emit.early-fire.delay = 60 s
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; [1]
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt;
> &gt;
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; [2]
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt;
> &gt;
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; x <[hidden email]
> &amp;amp;amp;amp;gt;
> &gt; 于2020年6月17日周三 上午11:14写道:
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
> &gt; &amp;gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; CREATE
> VIEW uv_per_10min AS
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
> SELECT&amp;amp;amp;amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
> &amp;amp;amp;amp;amp;nbsp;
> &gt; &amp;gt; MAX(DATE_FORMAT(proctime&amp;amp;amp;amp;amp;nbsp;,
> &gt; &amp;gt; &amp;amp;gt; 'yyyy-MM-dd
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
> HH:mm:00'))&amp;amp;amp;amp;amp;nbsp;OVER w
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; AS
> &gt; time_str,&amp;amp;amp;amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
> &amp;amp;amp;amp;amp;nbsp;
> &gt; COUNT(DISTINCT user_id) OVER
> &gt; &amp;gt; w AS uv
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; FROM
> user_behavior
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; WINDOW w
> AS (ORDER BY proctime
> &gt; ROWS BETWEEN
> &gt; &amp;gt; UNBOUNDED
> &gt; &amp;gt; &amp;amp;gt; PRECEDING AND
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; CURRENT
> ROW);
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
> 想请教一下,应该如何处理?
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; PARTITION
> BY
> &gt; DATE_FORMAT(rowtm, 'yyyy-MM-dd')
> &gt; &amp;gt; &amp;amp;gt; 这样可以吗,另外状态应该如何清理?
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
> PS:1.10貌似不支持DDL貌似不支持CREATE
> &gt; VIEW吧
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 多谢
> &gt;
> &gt;
> &gt;
> &gt; --
> &gt;
> &gt; Best,
> &gt; Benchao Li
>
>
>
> --
>
> Best,
> Benchao Li



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复: 求助:FLINKSQL1.10实时统计累计UV

x_gothicist
是blinkval setttings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()





------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年7月6日(星期一) 晚上11:11
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV



感觉不太应该有这种情况,你用的是blink planner么?

x <[hidden email]&gt; 于2020年7月6日周一 下午1:24写道:

&gt; sorry,我说错了,确实没有,都是group agg.
&gt;
&gt; 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年7月6日(星期一) 中午12:52
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt;
&gt;
&gt;
&gt; 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
&gt; 这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。
&gt;
&gt; [1]
&gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
&gt;
&gt; x <[hidden email]&amp;gt; 于2020年7月6日周一 上午11:15写道:
&gt;
&gt; &amp;gt; 版本是1.10.1,最后sink的时候确实是一个window里面做count
&gt; &amp;gt; distinct操作。请问是只要计算过程中含有一个window里面做count
&gt; &amp;gt;
&gt; distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,group&amp;amp;nbsp;DATE_FORMAT(rowtm,
&gt; &amp;gt; 'yyyy-MM-dd') 这个sql对应的状态很大。代码如下:
&gt; &amp;gt; val rt_totaluv_view : Table = tabEnv.sqlQuery(
&gt; &amp;gt;&amp;nbsp;&amp;nbsp; """
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 'yyyy-MM-dd
&gt; HH:mm:00'))
&gt; &amp;gt; time_str,COUNT(DISTINCT userkey) uv
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; FROM source
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; """)
&gt; &amp;gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
&gt; &amp;gt;
&gt; &amp;gt; val totaluvTmp =
&gt; tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp; .filter( line =&amp;amp;gt; line._1 == true ).map( line
&gt; =&amp;amp;gt; line._2 )
&gt; &amp;gt;
&gt; &amp;gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
&gt; &amp;gt;
&gt; &amp;gt; tabEnv.sqlUpdate(
&gt; &amp;gt;&amp;nbsp;&amp;nbsp; s"""
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; INSERT INTO mysql_totaluv
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; SELECT _1,MAX(_2)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; FROM $totaluvTabTmp
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; GROUP BY _1
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; """)
&gt; &amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&gt; &amp;gt; 发件人:&amp;amp;nbsp;"Benchao Li"<[hidden email]&amp;amp;gt;;
&gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47
&gt; &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;;
&gt; &amp;gt;
&gt; &amp;gt; 主题:&amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
&gt; &amp;gt; 这个已经在1.11中修复了。
&gt; &amp;gt;
&gt; &amp;gt; [1] https://issues.apache.org/jira/browse/FLINK-17942
&gt; &amp;gt;
&gt; &amp;gt; x <[hidden email]&amp;amp;gt; 于2020年7月3日周五 下午4:34写道:
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
&gt; &amp;gt; &amp;amp;gt; 发件人:&amp;amp;amp;nbsp;"Jark Wu"<[hidden email]&amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; 发送时间:&amp;amp;amp;nbsp;2020年6月18日(星期四) 中午12:16
&gt; &amp;gt; &amp;amp;gt; 收件人:&amp;amp;amp;nbsp;"user-zh"<[hidden email]
&gt; &amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; 主题:&amp;amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; 是的,我觉得这样子是能绕过的。
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; On Thu, 18 Jun 2020 at 10:34, x <[hidden email]&amp;amp;amp;gt;
&gt; wrote:
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery(
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; """
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; SELECT
&gt; &amp;gt; MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
&gt; &amp;gt; &amp;amp;gt; HH:mm:00'))
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; time_str,COUNT(DISTINCT userkey) uv
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; FROM
&gt; &amp;gt; user_behavior&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; GROUP BY
&gt; &amp;gt; &amp;amp;gt; DATE_FORMAT(ts,
&gt; 'yyyy-MM-dd')&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; """)
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; val
&gt; &amp;gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; .filter(line=&amp;amp;amp;amp;gt;line._1==true).map(line=&amp;amp;amp;amp;gt;line._2)
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream)
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; tabEnv.sqlUpdate(
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; s"""
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; INSERT
&gt; INTO
&gt; &amp;gt; rt_totaluv
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; SELECT
&gt; _1,MAX(_2)
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; FROM
&gt; $res
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; GROUP
&gt; BY _1
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;amp;amp;gt;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; """)
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt;
&gt; ------------------&amp;amp;amp;amp;nbsp;原始邮件&amp;amp;amp;amp;nbsp;------------------
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 发件人:&amp;amp;amp;amp;nbsp;"Jark Wu"<
&gt; [hidden email]&amp;amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 发送时间:&amp;amp;amp;amp;nbsp;2020年6月17日(星期三) 中午1:55
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 收件人:&amp;amp;amp;amp;nbsp;"user-zh"<
&gt; [hidden email]
&gt; &amp;gt; &amp;amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 主题:&amp;amp;amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 在 Flink 1.11 中,你可以尝试这样:
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; CREATE TABLE mysql (
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;nbsp; time_str
&gt; STRING,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;nbsp; uv BIGINT,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;nbsp; PRIMARY
&gt; KEY (ts) NOT ENFORCED
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ) WITH (
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;nbsp;
&gt; 'connector' = 'jdbc',
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;nbsp; 'url' =
&gt; &amp;gt; 'jdbc:mysql://localhost:3306/mydatabase',
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;nbsp;
&gt; 'table-name' = 'myuv'
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; );
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; INSERT INTO mysql
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
&gt; HH:mm:00')),
&gt; &amp;gt; &amp;amp;gt; COUNT(DISTINCT&amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; user_id)
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; FROM user_behavior;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; On Wed, 17 Jun 2020 at 13:49, x <
&gt; [hidden email]&amp;amp;amp;amp;gt;
&gt; &amp;gt; wrote:
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; sink表这个样式
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; tm uv
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 2020/06/17 13:46:00 10000
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 2020/06/17 13:47:00 20000
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 2020/06/17 13:48:00 30000
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; group by 日期的话,分钟如何获取
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; ------------------&amp;amp;amp;amp;amp;nbsp;原始邮件&amp;amp;amp;amp;amp;nbsp;------------------
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; 发件人:&amp;amp;amp;amp;amp;nbsp;"Benchao Li"<
&gt; &amp;gt; [hidden email]
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; 发送时间:&amp;amp;amp;amp;amp;nbsp;2020年6月17日(星期三) 中午11:46
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; 收件人:&amp;amp;amp;amp;amp;nbsp;"user-zh"<
&gt; &amp;gt; [hidden email]
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 主题:&amp;amp;amp;amp;amp;nbsp;Re:
&gt; 求助:FLINKSQL1.10实时统计累计UV
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; Hi,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 我感觉这种场景可以有两种方式,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 1. 可以直接用group by + mini batch
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 2. window聚合 + fast emit
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 对于#1,group
&gt; &amp;gt; by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm,
&gt; &amp;gt; &amp;amp;gt; 'yyyy-MM-dd')。
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 这种情况下的状态清理,需要配置state
&gt; retention时间,配置方法可以参考[1]
&gt; &amp;gt; 。同时,mini
&gt; &amp;gt; &amp;amp;gt; batch的开启也需要
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 用参数[2] 来打开。
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; fast
&gt; &amp;gt; &amp;amp;gt;
&gt; emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; table.exec.emit.early-fire.enabled = true
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; table.exec.emit.early-fire.delay = 60 s
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; [1]
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; [2]
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; x <[hidden email]
&gt; &amp;amp;amp;amp;amp;gt;
&gt; &amp;gt; 于2020年6月17日周三 上午11:14写道:
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt; CREATE
&gt; VIEW uv_per_10min AS
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt;
&gt; SELECT&amp;amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt;
&gt; &amp;amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; MAX(DATE_FORMAT(proctime&amp;amp;amp;amp;amp;amp;nbsp;,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 'yyyy-MM-dd
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; HH:mm:00'))&amp;amp;amp;amp;amp;amp;nbsp;OVER w
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt; AS
&gt; &amp;gt; time_str,&amp;amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt;
&gt; &amp;amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; COUNT(DISTINCT user_id) OVER
&gt; &amp;gt; &amp;amp;gt; w AS uv
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt; FROM
&gt; user_behavior
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt; WINDOW w
&gt; AS (ORDER BY proctime
&gt; &amp;gt; ROWS BETWEEN
&gt; &amp;gt; &amp;amp;gt; UNBOUNDED
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; PRECEDING AND
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt; CURRENT
&gt; ROW);
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt;
&gt; 想请教一下,应该如何处理?
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt; PARTITION
&gt; BY
&gt; &amp;gt; DATE_FORMAT(rowtm, 'yyyy-MM-dd')
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 这样可以吗,另外状态应该如何清理?
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt;
&gt; PS:1.10貌似不支持DDL貌似不支持CREATE
&gt; &amp;gt; VIEW吧
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; &amp;amp;amp;amp;amp;gt; 多谢
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; --
&gt; &amp;gt;
&gt; &amp;gt; Best,
&gt; &amp;gt; Benchao Li
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li



--

Best,
Benchao Li