请问大神们flink-sql可以指定时间清除内存中的全部State吗?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

请问大神们flink-sql可以指定时间清除内存中的全部State吗?

徐州州
最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStamp>current_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。
|                  select
|                       TO_DATE(cast(doi.DeliveryTime as String),'yyyy-MM-dd') as  days,
|                       doi.UserId,
|                       count(doi.Code) as   SendTime,
|                       sum(doi.PayAmount / 100) as SendCashcharge,
|                       sum(doi.PayAmount / 100 - ChargeAmount / 100 + UseBalance / 100) as  SendCashuse,
|                       sum(doi.CashMoney / 100)as  SendCash
|                    from dwd_order_info doi
|                    where doi.DeliveryTime &gt;cast(current_date AS TIMESTAMP) and doi.OrderType = 29 and doi.Status &gt;= 50 and doi.Status <&gt; 60
|                    group by TO_DATE(cast(doi.DeliveryTime as String),'yyyy-MM-dd'), doi.UserId
Reply | Threaded
Open this post in threaded view
|

Re: 请问大神们flink-sql可以指定时间清除内存中的全部State吗?

Jark
Administrator
为啥不用天级别的tumble window? 自动就帮你清楚 state 了

On Wed, 6 Jan 2021 at 13:53, 徐州州 <[hidden email]> wrote:

> 最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStamp&gt;current_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new
> MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。
> |                  select
> |                       TO_DATE(cast(doi.DeliveryTime as
> String),'yyyy-MM-dd') as  days,
> |                       doi.UserId,
> |                       count(doi.Code) as   SendTime,
> |                       sum(doi.PayAmount / 100) as SendCashcharge,
> |                       sum(doi.PayAmount / 100 - ChargeAmount / 100 +
> UseBalance / 100) as  SendCashuse,
> |                       sum(doi.CashMoney / 100)as  SendCash
> |                    from dwd_order_info doi
> |                    where doi.DeliveryTime &gt;cast(current_date AS
> TIMESTAMP) and doi.OrderType = 29 and doi.Status &gt;= 50 and doi.Status
> <&gt; 60
> |                    group by TO_DATE(cast(doi.DeliveryTime as
> String),'yyyy-MM-dd'), doi.UserId
Reply | Threaded
Open this post in threaded view
|

Re:Re: 请问大神们flink-sql可以指定时间清除内存中的全部State吗?

Michael Ran
group by [时间字段]
我们也有类似场景,每天数据的时间是不一样的,这样不会导致今天的数据累加才对啊?

















在 2021-01-14 15:06:26,"Jark Wu" <[hidden email]> 写道:

>为啥不用天级别的tumble window? 自动就帮你清楚 state 了
>
>On Wed, 6 Jan 2021 at 13:53, 徐州州 <[hidden email]> wrote:
>
>> 最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStamp&gt;current_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new
>> MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。
>> |                  select
>> |                       TO_DATE(cast(doi.DeliveryTime as
>> String),'yyyy-MM-dd') as  days,
>> |                       doi.UserId,
>> |                       count(doi.Code) as   SendTime,
>> |                       sum(doi.PayAmount / 100) as SendCashcharge,
>> |                       sum(doi.PayAmount / 100 - ChargeAmount / 100 +
>> UseBalance / 100) as  SendCashuse,
>> |                       sum(doi.CashMoney / 100)as  SendCash
>> |                    from dwd_order_info doi
>> |                    where doi.DeliveryTime &gt;cast(current_date AS
>> TIMESTAMP) and doi.OrderType = 29 and doi.Status &gt;= 50 and doi.Status
>> <&gt; 60
>> |                    group by TO_DATE(cast(doi.DeliveryTime as
>> String),'yyyy-MM-dd'), doi.UserId