最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStamp>current_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。
| select | TO_DATE(cast(doi.DeliveryTime as String),'yyyy-MM-dd') as days, | doi.UserId, | count(doi.Code) as SendTime, | sum(doi.PayAmount / 100) as SendCashcharge, | sum(doi.PayAmount / 100 - ChargeAmount / 100 + UseBalance / 100) as SendCashuse, | sum(doi.CashMoney / 100)as SendCash | from dwd_order_info doi | where doi.DeliveryTime >cast(current_date AS TIMESTAMP) and doi.OrderType = 29 and doi.Status >= 50 and doi.Status <> 60 | group by TO_DATE(cast(doi.DeliveryTime as String),'yyyy-MM-dd'), doi.UserId |
Administrator
|
为啥不用天级别的tumble window? 自动就帮你清楚 state 了
On Wed, 6 Jan 2021 at 13:53, 徐州州 <[hidden email]> wrote: > 最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStamp>current_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new > MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。 > | select > | TO_DATE(cast(doi.DeliveryTime as > String),'yyyy-MM-dd') as days, > | doi.UserId, > | count(doi.Code) as SendTime, > | sum(doi.PayAmount / 100) as SendCashcharge, > | sum(doi.PayAmount / 100 - ChargeAmount / 100 + > UseBalance / 100) as SendCashuse, > | sum(doi.CashMoney / 100)as SendCash > | from dwd_order_info doi > | where doi.DeliveryTime >cast(current_date AS > TIMESTAMP) and doi.OrderType = 29 and doi.Status >= 50 and doi.Status > <> 60 > | group by TO_DATE(cast(doi.DeliveryTime as > String),'yyyy-MM-dd'), doi.UserId |
group by [时间字段]
我们也有类似场景,每天数据的时间是不一样的,这样不会导致今天的数据累加才对啊? 在 2021-01-14 15:06:26,"Jark Wu" <[hidden email]> 写道: >为啥不用天级别的tumble window? 自动就帮你清楚 state 了 > >On Wed, 6 Jan 2021 at 13:53, 徐州州 <[hidden email]> wrote: > >> 最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStamp>current_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new >> MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。 >> | select >> | TO_DATE(cast(doi.DeliveryTime as >> String),'yyyy-MM-dd') as days, >> | doi.UserId, >> | count(doi.Code) as SendTime, >> | sum(doi.PayAmount / 100) as SendCashcharge, >> | sum(doi.PayAmount / 100 - ChargeAmount / 100 + >> UseBalance / 100) as SendCashuse, >> | sum(doi.CashMoney / 100)as SendCash >> | from dwd_order_info doi >> | where doi.DeliveryTime >cast(current_date AS >> TIMESTAMP) and doi.OrderType = 29 and doi.Status >= 50 and doi.Status >> <> 60 >> | group by TO_DATE(cast(doi.DeliveryTime as >> String),'yyyy-MM-dd'), doi.UserId |
Free forum by Nabble | Edit this page |