(无主题)

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

(无主题)

罗显宴

大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

Re: (无主题)

shizk233
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:

>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
>
> 签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

回复: (无主题)

罗显宴


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

Re: (无主题)

shizk233
Hi,

从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 下午2:09写道:

>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
>
> 签名由 网易邮箱大师 定制
>
Reply | Threaded
Open this post in threaded view
|

回复:(无主题)

罗显宴
好的,谢谢大佬,我用这个试试


| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年07月20日 15:11,shizk233 写道:
Hi,

从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 下午2:09写道:

>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
>
> 签名由 网易邮箱大师 定制
>
Reply | Threaded
Open this post in threaded view
|

回复: (无主题)

罗显宴
In reply to this post by 罗显宴
     我运行的时候,他直接按1小时窗口输出了,并没有按20秒连续输出递增
2020年7月20日 14:09[hidden email] 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

回复: (无主题)

罗显宴
In reply to this post by 罗显宴
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增

2020年7月20日 14:09[hidden email] 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

回复: (无主题)

罗显宴

大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
2020年7月20日 20:38[hidden email] 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增

2020年7月20日 14:09[hidden email] 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

drug.java (8K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: (无主题)

shizk233
Hi,

我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。

你可以让acc做个累加,然后结果输出里把acc的值带上看看。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 下午8:44写道:

大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
2020年7月20日 20:38[hidden email] 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增

2020年7月20日 14:09[hidden email] 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

回复: (无主题)

罗显宴
hi,
CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233<[hidden email]> 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <[hidden email]> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<[hidden email]> 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<[hidden email]> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

Re: (无主题)

shizk233
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:29写道:

> hi,
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233<[hidden email]> 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <[hidden email]> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<[hidden email]> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<[hidden email]> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
>
> 签名由 网易邮箱大师 定制
>
Reply | Threaded
Open this post in threaded view
|

回复: (无主题)

罗显宴
好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233<[hidden email]> 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:29写道:

hi,

CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233<[hidden email]> 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <[hidden email]> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<[hidden email]> 写道:
不好意思,刚才发的快,没来得及解释,

这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<[hidden email]> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:



大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

Reply | Threaded
Open this post in threaded view
|

Re: (无主题)

shizk233
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:53写道:

> 好的,
> 输入:
> 心功能不全和心律失常用药,1,时间戳
> 心功能不全和心律失常用药,1,时间戳
> 抗利尿剂,1,时间戳
> 血管收缩剂,1,时间戳
> 血管紧张素II受体拮抗剂,1,时间戳
>
>
> 这里的时间戳就是eventtime了
> 比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
> 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
> 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
> 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
> 输出4
>
>
> 即输出:
> 2020-7-20 19:00:00,2
> 2020-7-20 19:00:20,4
>
>
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:37,shizk233<[hidden email]> 写道:
> Hi,
>
> 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗
>
> Best,
> shizk233
>
> 罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:29写道:
>
> hi,
>
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233<[hidden email]> 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <[hidden email]> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<[hidden email]> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<[hidden email]> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:
>
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
>
> 签名由 网易邮箱大师 定制
>
>
Reply | Threaded
Open this post in threaded view
|

回复: (无主题)

罗显宴


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233<[hidden email]> 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233<[hidden email]> 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233<[hidden email]> 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <[hidden email]> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<[hidden email]> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<[hidden email]> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制


Reply | Threaded
Open this post in threaded view
|

回复: (无主题)

罗显宴
hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了
val result = num.timeWindowAll(Time.seconds(20))
//        .trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
.process(new ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] {

private var itemState: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getMapState(new MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int])))
          }

override def process(context: Context, elements: Iterable[IncreaseNumPerHour], out: Collector[IncreasePerHour]): Unit = {
var timestamp:Long = 0L
elements.foreach(kv => {
itemState.put(kv.category, 1)
timestamp = (kv.timestamp/2000+1)*2000
})
import scala.collection.JavaConversions._
            out.collect(IncreasePerHour(new Timestamp( timestamp - 1 ).toString,itemState.keys().size))
          }
        })


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 14:15,罗显宴<[hidden email]> 写道:


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233<[hidden email]> 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233<[hidden email]> 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233<[hidden email]> 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <[hidden email]> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<[hidden email]> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<[hidden email]> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制


Reply | Threaded
Open this post in threaded view
|

回复:(无主题)

罗显宴
感谢shizk233大佬,我这个问题终于得到解决,我主要是通过全窗口加mapstate实现的
best
shizk233


| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年07月21日 15:04,罗显宴 写道:
hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了
val result = num.timeWindowAll(Time.seconds(20))
//        .trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
.process(new ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] {

private var itemState: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getMapState(new MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int])))
         }

override def process(context: Context, elements: Iterable[IncreaseNumPerHour], out: Collector[IncreasePerHour]): Unit = {
var timestamp:Long = 0L
elements.foreach(kv => {
itemState.put(kv.category, 1)
timestamp = (kv.timestamp/2000+1)*2000
})
import scala.collection.JavaConversions._
           out.collect(IncreasePerHour(new Timestamp( timestamp - 1 ).toString,itemState.keys().size))
         }
       })


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 14:15,罗显宴<[hidden email]> 写道:


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233<[hidden email]> 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233<[hidden email]> 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233<[hidden email]> 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <[hidden email]> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<[hidden email]> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<[hidden email]> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:[hidden email]
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制


Reply | Threaded
Open this post in threaded view
|

Re: (无主题)

shizk233
恭喜!

罗显宴 <[hidden email]> 于2020年7月23日周四 上午1:14写道:

> 感谢shizk233大佬,我这个问题终于得到解决,我主要是通过全窗口加mapstate实现的
> best
> shizk233
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月21日 15:04,罗显宴 写道:
> hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了
> val result = num.timeWindowAll(Time.seconds(20))
> //        .trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
> .process(new
> ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] {
>
> private var itemState: MapState[String,Int] = _
>
> override def open(parameters: Configuration): Unit = {
> itemState = getRuntimeContext.getMapState(new
> MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int])))
>          }
>
> override def process(context: Context, elements:
> Iterable[IncreaseNumPerHour], out: Collector[IncreasePerHour]): Unit = {
> var timestamp:Long = 0L
> elements.foreach(kv => {
> itemState.put(kv.category, 1)
> timestamp = (kv.timestamp/2000+1)*2000
> })
> import scala.collection.JavaConversions._
>            out.collect(IncreasePerHour(new Timestamp( timestamp - 1
> ).toString,itemState.keys().size))
>          }
>        })
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 14:15,罗显宴<[hidden email]> 写道:
>
>
> hi,
>
> 我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 13:58,shizk233<[hidden email]> 写道:
> Hi,
>
> 首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。
>
> 按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
> 也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
> 这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
> 那么timer在12点的时候会输出的结果就是(12点,1)。
>
> 如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。
>
> 而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
> 期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。
>
> 我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿
>
>
> Best,
> shizk233
>
>
> 罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:53写道:
>
> 好的,
> 输入:
> 心功能不全和心律失常用药,1,时间戳
> 心功能不全和心律失常用药,1,时间戳
> 抗利尿剂,1,时间戳
> 血管收缩剂,1,时间戳
> 血管紧张素II受体拮抗剂,1,时间戳
>
>
> 这里的时间戳就是eventtime了
> 比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
> 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
> 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
> 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
> 输出4
>
>
> 即输出:
> 2020-7-20 19:00:00,2
> 2020-7-20 19:00:20,4
>
>
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:37,shizk233<[hidden email]> 写道:
> Hi,
>
> 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗
>
> Best,
> shizk233
>
> 罗显宴 <[hidden email]> 于2020年7月21日周二 上午11:29写道:
>
> hi,
>
>
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233<[hidden email]> 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <[hidden email]> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<[hidden email]> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
>
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<[hidden email]> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233<[hidden email]> 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <[hidden email]> 于2020年7月20日周一 上午1:18写道:
>
>
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:[hidden email]
> |
>
> 签名由 网易邮箱大师 定制
>
>
>