需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
CREATE VIEW uv_per_10min AS SELECT MAX(DATE_FORMAT(proctime , 'yyyy-MM-dd HH:mm:00')) OVER w AS time_str, COUNT(DISTINCT user_id) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW); 想请教一下,应该如何处理? PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理? PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 多谢 |
Hi,
我感觉这种场景可以有两种方式, 1. 可以直接用group by + mini batch 2. window聚合 + fast emit 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 用参数[2] 来打开。 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: table.exec.emit.early-fire.enabled = true table.exec.emit.early-fire.delay = 60 s [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html x <[hidden email]> 于2020年6月17日周三 上午11:14写道: > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > CREATE VIEW uv_per_10min AS > SELECT > MAX(DATE_FORMAT(proctime , 'yyyy-MM-dd HH:mm:00')) OVER w > AS time_str, > COUNT(DISTINCT user_id) OVER w AS uv > FROM user_behavior > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW); > > > 想请教一下,应该如何处理? > PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理? > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > 多谢 |
Administrator
|
本超提的两个方案也是阿里内部解决这个问题最常用的方式,但是 1.10 会有 primary key 的限制,要等到 1.11 才行。
另外这两个方案在追数据时,都可能会有毛刺现象(有几分钟没有值,因为数据追太快,跳过了)。 On Wed, 17 Jun 2020 at 11:46, Benchao Li <[hidden email]> wrote: > Hi, > 我感觉这种场景可以有两种方式, > 1. 可以直接用group by + mini batch > 2. window聚合 + fast emit > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。 > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > 用参数[2] 来打开。 > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > table.exec.emit.early-fire.enabled = true > table.exec.emit.early-fire.delay = 60 s > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <[hidden email]> 于2020年6月17日周三 上午11:14写道: > > > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > > CREATE VIEW uv_per_10min AS > > SELECT > > MAX(DATE_FORMAT(proctime , 'yyyy-MM-dd HH:mm:00')) OVER > w > > AS time_str, > > COUNT(DISTINCT user_id) OVER w AS uv > > FROM user_behavior > > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > > CURRENT ROW); > > > > > > 想请教一下,应该如何处理? > > PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理? > > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > > 多谢 > |
In reply to this post by Benchao Li-2
感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
sink表这个样式 tm uv 2020/06/17 13:46:00 10000 2020/06/17 13:47:00 20000 2020/06/17 13:48:00 30000 group by 日期的话,分钟如何获取 ------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年6月17日(星期三) 中午11:46 收件人: "user-zh"<[hidden email]>; 主题: Re: 求助:FLINKSQL1.10实时统计累计UV Hi, 我感觉这种场景可以有两种方式, 1. 可以直接用group by + mini batch 2. window聚合 + fast emit 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 用参数[2] 来打开。 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: table.exec.emit.early-fire.enabled = true table.exec.emit.early-fire.delay = 60 s [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html x <[hidden email]> 于2020年6月17日周三 上午11:14写道: > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > CREATE VIEW uv_per_10min AS > SELECT&nbsp; > &nbsp; MAX(DATE_FORMAT(proctime&nbsp;, 'yyyy-MM-dd HH:mm:00'))&nbsp;OVER w > AS time_str,&nbsp; > &nbsp; COUNT(DISTINCT user_id) OVER w AS uv > FROM user_behavior > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW); > > > 想请教一下,应该如何处理? > PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理? > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > 多谢 |
Administrator
|
在 Flink 1.11 中,你可以尝试这样:
CREATE TABLE mysql ( time_str STRING, uv BIGINT, PRIMARY KEY (ts) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'myuv' ); INSERT INTO mysql SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT user_id) FROM user_behavior; On Wed, 17 Jun 2020 at 13:49, x <[hidden email]> wrote: > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > sink表这个样式 > tm uv > 2020/06/17 13:46:00 10000 > 2020/06/17 13:47:00 20000 > 2020/06/17 13:48:00 30000 > > > group by 日期的话,分钟如何获取 > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<[hidden email]>; > 发送时间: 2020年6月17日(星期三) 中午11:46 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 求助:FLINKSQL1.10实时统计累计UV > > > > Hi, > 我感觉这种场景可以有两种方式, > 1. 可以直接用group by + mini batch > 2. window聚合 + fast emit > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。 > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > 用参数[2] 来打开。 > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > table.exec.emit.early-fire.enabled = true > table.exec.emit.early-fire.delay = 60 s > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <[hidden email]> 于2020年6月17日周三 上午11:14写道: > > > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > > CREATE VIEW uv_per_10min AS > > SELECT&nbsp; > > &nbsp; MAX(DATE_FORMAT(proctime&nbsp;, 'yyyy-MM-dd > HH:mm:00'))&nbsp;OVER w > > AS time_str,&nbsp; > > &nbsp; COUNT(DISTINCT user_id) OVER w AS uv > > FROM user_behavior > > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > > CURRENT ROW); > > > > > > 想请教一下,应该如何处理? > > PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理? > > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > > 多谢 |
明白了,十分感谢
------------------ 原始邮件 ------------------ 发件人: "Jark Wu"<[hidden email]>; 发送时间: 2020年6月17日(星期三) 中午1:55 收件人: "user-zh"<[hidden email]>; 主题: Re: 求助:FLINKSQL1.10实时统计累计UV 在 Flink 1.11 中,你可以尝试这样: CREATE TABLE mysql ( time_str STRING, uv BIGINT, PRIMARY KEY (ts) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'myuv' ); INSERT INTO mysql SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT user_id) FROM user_behavior; On Wed, 17 Jun 2020 at 13:49, x <[hidden email]> wrote: > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > sink表这个样式 > tm uv > 2020/06/17 13:46:00 10000 > 2020/06/17 13:47:00 20000 > 2020/06/17 13:48:00 30000 > > > group by 日期的话,分钟如何获取 > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月17日(星期三) 中午11:46 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > Hi, > 我感觉这种场景可以有两种方式, > 1. 可以直接用group by + mini batch > 2. window聚合 + fast emit > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。 > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > 用参数[2] 来打开。 > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > table.exec.emit.early-fire.enabled = true > table.exec.emit.early-fire.delay = 60 s > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <[hidden email]&gt; 于2020年6月17日周三 上午11:14写道: > > &gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > &gt; CREATE VIEW uv_per_10min AS > &gt; SELECT&amp;nbsp; > &gt; &amp;nbsp; MAX(DATE_FORMAT(proctime&amp;nbsp;, 'yyyy-MM-dd > HH:mm:00'))&amp;nbsp;OVER w > &gt; AS time_str,&amp;nbsp; > &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv > &gt; FROM user_behavior > &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > &gt; CURRENT ROW); > &gt; > &gt; > &gt; 想请教一下,应该如何处理? > &gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理? > &gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > &gt; 多谢 |
In reply to this post by Jark
如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
val resTmpTab: Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM user_behavior GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd') """) val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) .filter(line=>line._1==true).map(line=>line._2) val res= tabEnv.fromDataStream(resTmpStream) tabEnv.sqlUpdate( s""" INSERT INTO rt_totaluv SELECT _1,MAX(_2) FROM $res GROUP BY _1 """) ------------------ 原始邮件 ------------------ 发件人: "Jark Wu"<[hidden email]>; 发送时间: 2020年6月17日(星期三) 中午1:55 收件人: "user-zh"<[hidden email]>; 主题: Re: 求助:FLINKSQL1.10实时统计累计UV 在 Flink 1.11 中,你可以尝试这样: CREATE TABLE mysql ( time_str STRING, uv BIGINT, PRIMARY KEY (ts) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'myuv' ); INSERT INTO mysql SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT user_id) FROM user_behavior; On Wed, 17 Jun 2020 at 13:49, x <[hidden email]> wrote: > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > sink表这个样式 > tm uv > 2020/06/17 13:46:00 10000 > 2020/06/17 13:47:00 20000 > 2020/06/17 13:48:00 30000 > > > group by 日期的话,分钟如何获取 > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月17日(星期三) 中午11:46 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > Hi, > 我感觉这种场景可以有两种方式, > 1. 可以直接用group by + mini batch > 2. window聚合 + fast emit > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。 > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > 用参数[2] 来打开。 > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > table.exec.emit.early-fire.enabled = true > table.exec.emit.early-fire.delay = 60 s > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <[hidden email]&gt; 于2020年6月17日周三 上午11:14写道: > > &gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > &gt; CREATE VIEW uv_per_10min AS > &gt; SELECT&amp;nbsp; > &gt; &amp;nbsp; MAX(DATE_FORMAT(proctime&amp;nbsp;, 'yyyy-MM-dd > HH:mm:00'))&amp;nbsp;OVER w > &gt; AS time_str,&amp;nbsp; > &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv > &gt; FROM user_behavior > &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > &gt; CURRENT ROW); > &gt; > &gt; > &gt; 想请教一下,应该如何处理? > &gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理? > &gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > &gt; 多谢 |
Administrator
|
是的,我觉得这样子是能绕过的。
On Thu, 18 Jun 2020 at 10:34, x <[hidden email]> wrote: > 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > val resTmpTab: Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM user_behavior GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd') """) > > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > .filter(line=>line._1==true).map(line=>line._2) > > val res= tabEnv.fromDataStream(resTmpStream) > tabEnv.sqlUpdate( > s""" > INSERT INTO rt_totaluv > SELECT _1,MAX(_2) > FROM $res > GROUP BY _1 > """) > > > ------------------ 原始邮件 ------------------ > 发件人: "Jark Wu"<[hidden email]>; > 发送时间: 2020年6月17日(星期三) 中午1:55 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 在 Flink 1.11 中,你可以尝试这样: > > CREATE TABLE mysql ( > time_str STRING, > uv BIGINT, > PRIMARY KEY (ts) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/mydatabase', > 'table-name' = 'myuv' > ); > > INSERT INTO mysql > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT > user_id) > FROM user_behavior; > > On Wed, 17 Jun 2020 at 13:49, x <[hidden email]> wrote: > > > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > > sink表这个样式 > > tm uv > > 2020/06/17 13:46:00 10000 > > 2020/06/17 13:47:00 20000 > > 2020/06/17 13:48:00 30000 > > > > > > group by 日期的话,分钟如何获取 > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年6月17日(星期三) 中午11:46 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > > > > > Hi, > > 我感觉这种场景可以有两种方式, > > 1. 可以直接用group by + mini batch > > 2. window聚合 + fast emit > > > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。 > > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > > 用参数[2] 来打开。 > > > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > > table.exec.emit.early-fire.enabled = true > > table.exec.emit.early-fire.delay = 60 s > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > [2] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > > > x <[hidden email]&gt; 于2020年6月17日周三 上午11:14写道: > > > > &gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > > &gt; CREATE VIEW uv_per_10min AS > > &gt; SELECT&amp;nbsp; > > &gt; &amp;nbsp; MAX(DATE_FORMAT(proctime&amp;nbsp;, > 'yyyy-MM-dd > > HH:mm:00'))&amp;nbsp;OVER w > > &gt; AS time_str,&amp;nbsp; > > &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv > > &gt; FROM user_behavior > > &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED > PRECEDING AND > > &gt; CURRENT ROW); > > &gt; > > &gt; > > &gt; 想请教一下,应该如何处理? > > &gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') > 这样可以吗,另外状态应该如何清理? > > &gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > > &gt; 多谢 |
您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。 ------------------ 原始邮件 ------------------ 发件人: "Jark Wu"<[hidden email]>; 发送时间: 2020年6月18日(星期四) 中午12:16 收件人: "user-zh"<[hidden email]>; 主题: Re: 求助:FLINKSQL1.10实时统计累计UV 是的,我觉得这样子是能绕过的。 On Thu, 18 Jun 2020 at 10:34, x <[hidden email]> wrote: > 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > val resTmpTab: Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM user_behavior GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd') """) > > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > .filter(line=&gt;line._1==true).map(line=&gt;line._2) > > val res= tabEnv.fromDataStream(resTmpStream) > tabEnv.sqlUpdate( > s""" > INSERT INTO rt_totaluv > SELECT _1,MAX(_2) > FROM $res > GROUP BY _1 > """) > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月17日(星期三) 中午1:55 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 在 Flink 1.11 中,你可以尝试这样: > > CREATE TABLE mysql ( > &nbsp;&nbsp; time_str STRING, > &nbsp;&nbsp; uv BIGINT, > &nbsp;&nbsp; PRIMARY KEY (ts) NOT ENFORCED > ) WITH ( > &nbsp;&nbsp; 'connector' = 'jdbc', > &nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3306/mydatabase', > &nbsp;&nbsp; 'table-name' = 'myuv' > ); > > INSERT INTO mysql > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT&nbsp; > user_id) > FROM user_behavior; > > On Wed, 17 Jun 2020 at 13:49, x <[hidden email]&gt; wrote: > > &gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > &gt; sink表这个样式 > &gt; tm uv > &gt; 2020/06/17 13:46:00 10000 > &gt; 2020/06/17 13:47:00 20000 > &gt; 2020/06/17 13:48:00 30000 > &gt; > &gt; > &gt; group by 日期的话,分钟如何获取 > &gt; > &gt; > &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > &gt; 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;; > &gt; 发送时间:&amp;nbsp;2020年6月17日(星期三) 中午11:46 > &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; > &gt; > &gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > &gt; > &gt; > &gt; > &gt; Hi, > &gt; 我感觉这种场景可以有两种方式, > &gt; 1. 可以直接用group by + mini batch > &gt; 2. window聚合 + fast emit > &gt; > &gt; 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。 > &gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > &gt; 用参数[2] 来打开。 > &gt; > &gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > &gt; fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > &gt; table.exec.emit.early-fire.enabled = true > &gt; table.exec.emit.early-fire.delay = 60 s > &gt; > &gt; [1] > &gt; > &gt; > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > &gt; [2] > &gt; > &gt; > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > &gt; > &gt; x <[hidden email]&amp;gt; 于2020年6月17日周三 上午11:14写道: > &gt; > &gt; &amp;gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > &gt; &amp;gt; CREATE VIEW uv_per_10min AS > &gt; &amp;gt; SELECT&amp;amp;nbsp; > &gt; &amp;gt; &amp;amp;nbsp; MAX(DATE_FORMAT(proctime&amp;amp;nbsp;, > 'yyyy-MM-dd > &gt; HH:mm:00'))&amp;amp;nbsp;OVER w > &gt; &amp;gt; AS time_str,&amp;amp;nbsp; > &gt; &amp;gt; &amp;amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv > &gt; &amp;gt; FROM user_behavior > &gt; &amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED > PRECEDING AND > &gt; &amp;gt; CURRENT ROW); > &gt; &amp;gt; > &gt; &amp;gt; > &gt; &amp;gt; 想请教一下,应该如何处理? > &gt; &amp;gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') > 这样可以吗,另外状态应该如何清理? > &gt; &amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > &gt; &amp;gt; 多谢 |
你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
这个已经在1.11中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-17942 x <[hidden email]> 于2020年7月3日周五 下午4:34写道: > 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, > > 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Jark Wu"<[hidden email]>; > 发送时间: 2020年6月18日(星期四) 中午12:16 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 是的,我觉得这样子是能绕过的。 > > On Thu, 18 Jun 2020 at 10:34, x <[hidden email]> wrote: > > > 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > > val resTmpTab: Table = tabEnv.sqlQuery( > > """ > > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd > HH:mm:00')) > > time_str,COUNT(DISTINCT userkey) uv > > FROM user_behavior GROUP BY > DATE_FORMAT(ts, 'yyyy-MM-dd') """) > > > > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > > > .filter(line=&gt;line._1==true).map(line=&gt;line._2) > > > > val res= tabEnv.fromDataStream(resTmpStream) > > tabEnv.sqlUpdate( > > s""" > > INSERT INTO rt_totaluv > > SELECT _1,MAX(_2) > > FROM $res > > GROUP BY _1 > > """) > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年6月17日(星期三) 中午1:55 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > > > > > 在 Flink 1.11 中,你可以尝试这样: > > > > CREATE TABLE mysql ( > > &nbsp;&nbsp; time_str STRING, > > &nbsp;&nbsp; uv BIGINT, > > &nbsp;&nbsp; PRIMARY KEY (ts) NOT ENFORCED > > ) WITH ( > > &nbsp;&nbsp; 'connector' = 'jdbc', > > &nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3306/mydatabase', > > &nbsp;&nbsp; 'table-name' = 'myuv' > > ); > > > > INSERT INTO mysql > > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), > COUNT(DISTINCT&nbsp; > > user_id) > > FROM user_behavior; > > > > On Wed, 17 Jun 2020 at 13:49, x <[hidden email]&gt; wrote: > > > > &gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > > &gt; sink表这个样式 > > &gt; tm uv > > &gt; 2020/06/17 13:46:00 10000 > > &gt; 2020/06/17 13:47:00 20000 > > &gt; 2020/06/17 13:48:00 30000 > > &gt; > > &gt; > > &gt; group by 日期的话,分钟如何获取 > > &gt; > > &gt; > > &gt; > ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > > &gt; 发件人:&amp;nbsp;"Benchao Li"<[hidden email] > &amp;gt;; > > &gt; 发送时间:&amp;nbsp;2020年6月17日(星期三) 中午11:46 > > &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email] > &amp;gt;; > > &gt; > > &gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > &gt; > > &gt; > > &gt; > > &gt; Hi, > > &gt; 我感觉这种场景可以有两种方式, > > &gt; 1. 可以直接用group by + mini batch > > &gt; 2. window聚合 + fast emit > > &gt; > > &gt; 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, > 'yyyy-MM-dd')。 > > &gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini > batch的开启也需要 > > &gt; 用参数[2] 来打开。 > > &gt; > > &gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > > &gt; fast > emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > > &gt; table.exec.emit.early-fire.enabled = true > > &gt; table.exec.emit.early-fire.delay = 60 s > > &gt; > > &gt; [1] > > &gt; > > &gt; > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > &gt; [2] > > &gt; > > &gt; > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > &gt; > > &gt; x <[hidden email]&amp;gt; 于2020年6月17日周三 上午11:14写道: > > &gt; > > &gt; &amp;gt; > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > > &gt; &amp;gt; CREATE VIEW uv_per_10min AS > > &gt; &amp;gt; SELECT&amp;amp;nbsp; > > &gt; &amp;gt; &amp;amp;nbsp; > MAX(DATE_FORMAT(proctime&amp;amp;nbsp;, > > 'yyyy-MM-dd > > &gt; HH:mm:00'))&amp;amp;nbsp;OVER w > > &gt; &amp;gt; AS time_str,&amp;amp;nbsp; > > &gt; &amp;gt; &amp;amp;nbsp; COUNT(DISTINCT user_id) OVER > w AS uv > > &gt; &amp;gt; FROM user_behavior > > &gt; &amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN > UNBOUNDED > > PRECEDING AND > > &gt; &amp;gt; CURRENT ROW); > > &gt; &amp;gt; > > &gt; &amp;gt; > > &gt; &amp;gt; 想请教一下,应该如何处理? > > &gt; &amp;gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') > > 这样可以吗,另外状态应该如何清理? > > &gt; &amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > > &gt; &amp;gt; 多谢 -- Best, Benchao Li |
Free forum by Nabble | Edit this page |