flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

classic Classic list List threaded Threaded
23 messages Options
12
Reply | Threaded
Open this post in threaded view
|

flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

鱼子酱
This post was updated on .
Hi,社区的各位大家好:
我目前生产上面使用的是1.8.2版本,相对稳定
为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
截止目前先后研究了1.10.1 1.11.1共2个大版本

在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
状态后端使用的是rocksdb 的增量模式
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/",true);
设置了官网文档中找到的删除策略:
        TableConfig tableConfig = streamTableEnvironment.getConfig();
        tableConfig.setIdleStateRetentionTime(Time.minutes(2),
Time.minutes(7));      

请问是我使用的方式不对吗?

通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator



版本影响:flink1.10.1 flink1.11.1
planner:blink planner
source : kafka source
时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


               

               
sql:
insert into  result
    select request_time ,request_id ,request_cnt ,avg_resptime
,stddev_resptime ,terminal_cnt
,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from
    (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
        ,commandId as request_id
        ,count(*) as request_cnt
        ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
        ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
        from log
        where
        commandId in (111111)
        and errCode=0 and attr=0
        group by TUMBLE(times, INTERVAL '1' MINUTE),commandId

    union all

    select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
        ,99999999
        ,count(*) as request_cnt
        ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
        ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
        from log
        where
        commandId in (111111)
        and errCode=0 and attr=0
        group by TUMBLE(times, INTERVAL '1' MINUTE)
    )

       
source:

    create table log (
      eventTime bigint
      ,times timestamp(3)
          ……………………
      ,commandId integer
      ,watermark for times as times - interval '5' second
    )
    with(
     'connector' = 'kafka-0.10',
     'topic' = '……',
     'properties.bootstrap.servers' = '……',
     'properties.group.id' = '……',
     'scan.startup.mode' = 'latest-offset',
     'format' = 'json'
    )

sink1:
create table result (
      request_time varchar
      ,request_id integer
      ,request_cnt bigint
      ,avg_resptime double
      ,stddev_resptime double
      ,insert_time varchar
    ) with (
      'connector' = 'kafka-0.10',
      'topic' = '……',
      'properties.bootstrap.servers' = '……',
      'properties.group.id' = '……',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    )
       




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Congxian Qiu
Hi
    SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
watermark[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html

Best,
Congxian


鱼子酱 <[hidden email]> 于2020年7月28日周二 下午2:45写道:

> Hi,社区的各位大家好:
> 我目前生产上面使用的是1.8.2版本,相对稳定
> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>
> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> 状态后端使用的是rocksdb 的增量模式
> StateBackend backend =new
> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> 设置了官网文档中找到的删除策略:
>         TableConfig tableConfig = streamTableEnvironment.getConfig();
>         tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> Time.minutes(7));
>
> 请问是我使用的方式不对吗?
>
> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>
>
>
> 版本影响:flink1.10.1 flink1.11.1
> planner:blink planner
> source : kafka source
> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>
>
>
> sql:
> insert into  result
>     select request_time ,request_id ,request_cnt ,avg_resptime
> ,stddev_resptime ,terminal_cnt
> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from
>     (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>         ,commandId as request_id
>         ,count(*) as request_cnt
>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
>         from log
>         where
>         commandId in (104005 ,204005 ,404005)
>         and errCode=0 and attr=0
>         group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>
>         union all
>
>         select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>         ,99999999
>         ,count(*) as request_cnt
>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
>         from log
>         where
>         commandId in (104005 ,204005 ,404005)
>         and errCode=0 and attr=0
>         group by TUMBLE(times, INTERVAL '1' MINUTE)
>     )
>
>
> source:
>
>     create table log (
>       eventTime bigint
>       ,times timestamp(3)
>           ……………………
>       ,commandId integer
>       ,watermark for times as times - interval '5' second
>     )
>     with(
>      'connector' = 'kafka-0.10',
>      'topic' = '……',
>      'properties.bootstrap.servers' = '……',
>      'properties.group.id' = '……',
>      'scan.startup.mode' = 'latest-offset',
>      'format' = 'json'
>     )
>
> sink1:
> create table result (
>       request_time varchar
>       ,request_id integer
>       ,request_cnt bigint
>       ,avg_resptime double
>       ,stddev_resptime double
>       ,insert_time varchar
>     ) with (
>       'connector' = 'kafka-0.10',
>       'topic' = '……',
>       'properties.bootstrap.servers' = '……',
>       'properties.group.id' = '……',
>       'scan.startup.mode' = 'latest-offset',
>       'format' = 'json'
>     )
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

鱼子酱
您好:

我按照您说的试了看了一下watermark,
发现可以 正常更新,相关的计算结果也没发现问题。
1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
<http://apache-flink.147419.n8.nabble.com/file/t793/111.png>
2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
<http://apache-flink.147419.n8.nabble.com/file/t793/333.png>
<http://apache-flink.147419.n8.nabble.com/file/t793/222.png>



Congxian Qiu wrote

> Hi
>     SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
> watermark[1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
>
> Best,
> Congxian
>
>
> 鱼子酱 <

> 384939718@

>> 于2020年7月28日周二 下午2:45写道:
>
>> Hi,社区的各位大家好:
>> 我目前生产上面使用的是1.8.2版本,相对稳定
>> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>>
>> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> 状态后端使用的是rocksdb 的增量模式
>> StateBackend backend =new
>> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> 设置了官网文档中找到的删除策略:
>>         TableConfig tableConfig = streamTableEnvironment.getConfig();
>>         tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> Time.minutes(7));
>>
>> 请问是我使用的方式不对吗?
>>
>> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>>
>>
>>
>> 版本影响:flink1.10.1 flink1.11.1
>> planner:blink planner
>> source : kafka source
>> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>
>>
>>
>>
>> sql:
>> insert into  result
>>     select request_time ,request_id ,request_cnt ,avg_resptime
>> ,stddev_resptime ,terminal_cnt
>> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19)
>> from
>>     (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>>         ,commandId as request_id
>>         ,count(*) as request_cnt
>>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> stddev_resptime
>>         from log
>>         where
>>         commandId in (104005 ,204005 ,404005)
>>         and errCode=0 and attr=0
>>         group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>>
>>         union all
>>
>>         select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>>         ,99999999
>>         ,count(*) as request_cnt
>>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> stddev_resptime
>>         from log
>>         where
>>         commandId in (104005 ,204005 ,404005)
>>         and errCode=0 and attr=0
>>         group by TUMBLE(times, INTERVAL '1' MINUTE)
>>     )
>>
>>
>> source:
>>
>>     create table log (
>>       eventTime bigint
>>       ,times timestamp(3)
>>           ……………………
>>       ,commandId integer
>>       ,watermark for times as times - interval '5' second
>>     )
>>     with(
>>      'connector' = 'kafka-0.10',
>>      'topic' = '……',
>>      'properties.bootstrap.servers' = '……',
>>      'properties.group.id' = '……',
>>      'scan.startup.mode' = 'latest-offset',
>>      'format' = 'json'
>>     )
>>
>> sink1:
>> create table result (
>>       request_time varchar
>>       ,request_id integer
>>       ,request_cnt bigint
>>       ,avg_resptime double
>>       ,stddev_resptime double
>>       ,insert_time varchar
>>     ) with (
>>       'connector' = 'kafka-0.10',
>>       'topic' = '……',
>>       'properties.bootstrap.servers' = '……',
>>       'properties.group.id' = '……',
>>       'scan.startup.mode' = 'latest-offset',
>>       'format' = 'json'
>>     )
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Benchao Li-2
这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。

鱼子酱 <[hidden email]> 于2020年7月29日周三 上午9:47写道:

> 您好:
>
> 我按照您说的试了看了一下watermark,
> 发现可以 正常更新,相关的计算结果也没发现问题。
> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
> <http://apache-flink.147419.n8.nabble.com/file/t793/111.png>
> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
> <http://apache-flink.147419.n8.nabble.com/file/t793/333.png>
> <http://apache-flink.147419.n8.nabble.com/file/t793/222.png>
>
>
>
> Congxian Qiu wrote
> > Hi
> >     SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
> 越来越大的情况,或许可以检查下
> > watermark[1]
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> >
> > Best,
> > Congxian
> >
> >
> > 鱼子酱 <
>
> > 384939718@
>
> >> 于2020年7月28日周二 下午2:45写道:
> >
> >> Hi,社区的各位大家好:
> >> 我目前生产上面使用的是1.8.2版本,相对稳定
> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
> >>
> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> >> 状态后端使用的是rocksdb 的增量模式
> >> StateBackend backend =new
> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> >> 设置了官网文档中找到的删除策略:
> >>         TableConfig tableConfig = streamTableEnvironment.getConfig();
> >>         tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> >> Time.minutes(7));
> >>
> >> 请问是我使用的方式不对吗?
> >>
> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
> >>
> >>
> >>
> >> 版本影响:flink1.10.1 flink1.11.1
> >> planner:blink planner
> >> source : kafka source
> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>
> >>
> >>
> >>
> >>
> >> sql:
> >> insert into  result
> >>     select request_time ,request_id ,request_cnt ,avg_resptime
> >> ,stddev_resptime ,terminal_cnt
> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19)
> >> from
> >>     (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >>         ,commandId as request_id
> >>         ,count(*) as request_cnt
> >>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >>         from log
> >>         where
> >>         commandId in (104005 ,204005 ,404005)
> >>         and errCode=0 and attr=0
> >>         group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
> >>
> >>         union all
> >>
> >>         select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >>         ,99999999
> >>         ,count(*) as request_cnt
> >>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >>         from log
> >>         where
> >>         commandId in (104005 ,204005 ,404005)
> >>         and errCode=0 and attr=0
> >>         group by TUMBLE(times, INTERVAL '1' MINUTE)
> >>     )
> >>
> >>
> >> source:
> >>
> >>     create table log (
> >>       eventTime bigint
> >>       ,times timestamp(3)
> >>           ……………………
> >>       ,commandId integer
> >>       ,watermark for times as times - interval '5' second
> >>     )
> >>     with(
> >>      'connector' = 'kafka-0.10',
> >>      'topic' = '……',
> >>      'properties.bootstrap.servers' = '……',
> >>      'properties.group.id' = '……',
> >>      'scan.startup.mode' = 'latest-offset',
> >>      'format' = 'json'
> >>     )
> >>
> >> sink1:
> >> create table result (
> >>       request_time varchar
> >>       ,request_id integer
> >>       ,request_cnt bigint
> >>       ,avg_resptime double
> >>       ,stddev_resptime double
> >>       ,insert_time varchar
> >>     ) with (
> >>       'connector' = 'kafka-0.10',
> >>       'topic' = '……',
> >>       'properties.bootstrap.servers' = '……',
> >>       'properties.group.id' = '……',
> >>       'scan.startup.mode' = 'latest-offset',
> >>       'format' = 'json'
> >>     )
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Dream-底限
In reply to this post by 鱼子酱
hi 鱼子酱、
我当初这样用的时候状态也不清理(子查询+时间窗口+union),后来把时间窗口改成全局group函数,union改成订阅topic列表后,设置状态过期时间状态才清理。。。
后来看资料有的说分区数据不均衡导致水印不推进的话可能导致这种状态不清理的问题,但是我感觉不是水印导致的,水印导致的窗口应该不触发计算吧,感觉这里面有些bug,需要专业人士定位一下。。。。

鱼子酱 <[hidden email]> 于2020年7月29日周三 上午9:53写道:

> 您好:
>
> 我按照您说的试了看了一下watermark,
> 发现可以 正常更新,相关的计算结果也没发现问题。
> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
> <http://apache-flink.147419.n8.nabble.com/file/t793/111.png>
> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
> <http://apache-flink.147419.n8.nabble.com/file/t793/333.png>
> <http://apache-flink.147419.n8.nabble.com/file/t793/222.png>
>
>
>
> Congxian Qiu wrote
> > Hi
> >     SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
> 越来越大的情况,或许可以检查下
> > watermark[1]
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> >
> > Best,
> > Congxian
> >
> >
> > 鱼子酱 <
>
> > 384939718@
>
> >> 于2020年7月28日周二 下午2:45写道:
> >
> >> Hi,社区的各位大家好:
> >> 我目前生产上面使用的是1.8.2版本,相对稳定
> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
> >>
> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> >> 状态后端使用的是rocksdb 的增量模式
> >> StateBackend backend =new
> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> >> 设置了官网文档中找到的删除策略:
> >>         TableConfig tableConfig = streamTableEnvironment.getConfig();
> >>         tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> >> Time.minutes(7));
> >>
> >> 请问是我使用的方式不对吗?
> >>
> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
> >>
> >>
> >>
> >> 版本影响:flink1.10.1 flink1.11.1
> >> planner:blink planner
> >> source : kafka source
> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>
> >>
> >>
> >>
> >>
> >> sql:
> >> insert into  result
> >>     select request_time ,request_id ,request_cnt ,avg_resptime
> >> ,stddev_resptime ,terminal_cnt
> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19)
> >> from
> >>     (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >>         ,commandId as request_id
> >>         ,count(*) as request_cnt
> >>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >>         from log
> >>         where
> >>         commandId in (104005 ,204005 ,404005)
> >>         and errCode=0 and attr=0
> >>         group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
> >>
> >>         union all
> >>
> >>         select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >>         ,99999999
> >>         ,count(*) as request_cnt
> >>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >>         from log
> >>         where
> >>         commandId in (104005 ,204005 ,404005)
> >>         and errCode=0 and attr=0
> >>         group by TUMBLE(times, INTERVAL '1' MINUTE)
> >>     )
> >>
> >>
> >> source:
> >>
> >>     create table log (
> >>       eventTime bigint
> >>       ,times timestamp(3)
> >>           ……………………
> >>       ,commandId integer
> >>       ,watermark for times as times - interval '5' second
> >>     )
> >>     with(
> >>      'connector' = 'kafka-0.10',
> >>      'topic' = '……',
> >>      'properties.bootstrap.servers' = '……',
> >>      'properties.group.id' = '……',
> >>      'scan.startup.mode' = 'latest-offset',
> >>      'format' = 'json'
> >>     )
> >>
> >> sink1:
> >> create table result (
> >>       request_time varchar
> >>       ,request_id integer
> >>       ,request_cnt bigint
> >>       ,avg_resptime double
> >>       ,stddev_resptime double
> >>       ,insert_time varchar
> >>     ) with (
> >>       'connector' = 'kafka-0.10',
> >>       'topic' = '……',
> >>       'properties.bootstrap.servers' = '……',
> >>       'properties.group.id' = '……',
> >>       'scan.startup.mode' = 'latest-offset',
> >>       'format' = 'json'
> >>     )
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

鱼子酱
In reply to this post by Benchao Li-2
感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

        StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

这样看,有可能是RocksDBStateBackend*增量模式*这边可能存在一些问题。

下面两种都能成功清理
RocksDBStateBackend:
<http://apache-flink.147419.n8.nabble.com/file/t793/444.png>

FsStateBackend:
<http://apache-flink.147419.n8.nabble.com/file/t793/555.png>


Benchao Li-2 wrote
> 这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
> 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
> state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。
>
> 鱼子酱 <

> 384939718@

>> 于2020年7月29日周三 上午9:47写道:
>
>> 您好:
>>
>> 我按照您说的试了看了一下watermark,
>> 发现可以 正常更新,相关的计算结果也没发现问题。
>> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
>> &lt;http://apache-flink.147419.n8.nabble.com/file/t793/111.png&gt;
>> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
>> &lt;http://apache-flink.147419.n8.nabble.com/file/t793/333.png&gt;
>> &lt;http://apache-flink.147419.n8.nabble.com/file/t793/222.png&gt;
>>
>>
>>
>> Congxian Qiu wrote
>> > Hi
>> >     SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
>> 越来越大的情况,或许可以检查下
>> > watermark[1]
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > 鱼子酱 <
>>
>> > 384939718@
>>
>> >> 于2020年7月28日周二 下午2:45写道:
>> >
>> >> Hi,社区的各位大家好:
>> >> 我目前生产上面使用的是1.8.2版本,相对稳定
>> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>> >>
>> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> >> 状态后端使用的是rocksdb 的增量模式
>> >> StateBackend backend =new
>> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> >> 设置了官网文档中找到的删除策略:
>> >>         TableConfig tableConfig = streamTableEnvironment.getConfig();
>> >>         tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> >> Time.minutes(7));
>> >>
>> >> 请问是我使用的方式不对吗?
>> >>
>> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>> >>
>> >>
>> >>
>> >> 版本影响:flink1.10.1 flink1.11.1
>> >> planner:blink planner
>> >> source : kafka source
>> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> sql:
>> >> insert into  result
>> >>     select request_time ,request_id ,request_cnt ,avg_resptime
>> >> ,stddev_resptime ,terminal_cnt
>> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19)
>> >> from
>> >>     (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >>         ,commandId as request_id
>> >>         ,count(*) as request_cnt
>> >>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >>         from log
>> >>         where
>> >>         commandId in (104005 ,204005 ,404005)
>> >>         and errCode=0 and attr=0
>> >>         group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>> >>
>> >>         union all
>> >>
>> >>         select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >>         ,99999999
>> >>         ,count(*) as request_cnt
>> >>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >>         from log
>> >>         where
>> >>         commandId in (104005 ,204005 ,404005)
>> >>         and errCode=0 and attr=0
>> >>         group by TUMBLE(times, INTERVAL '1' MINUTE)
>> >>     )
>> >>
>> >>
>> >> source:
>> >>
>> >>     create table log (
>> >>       eventTime bigint
>> >>       ,times timestamp(3)
>> >>           ……………………
>> >>       ,commandId integer
>> >>       ,watermark for times as times - interval '5' second
>> >>     )
>> >>     with(
>> >>      'connector' = 'kafka-0.10',
>> >>      'topic' = '……',
>> >>      'properties.bootstrap.servers' = '……',
>> >>      'properties.group.id' = '……',
>> >>      'scan.startup.mode' = 'latest-offset',
>> >>      'format' = 'json'
>> >>     )
>> >>
>> >> sink1:
>> >> create table result (
>> >>       request_time varchar
>> >>       ,request_id integer
>> >>       ,request_cnt bigint
>> >>       ,avg_resptime double
>> >>       ,stddev_resptime double
>> >>       ,insert_time varchar
>> >>     ) with (
>> >>       'connector' = 'kafka-0.10',
>> >>       'topic' = '……',
>> >>       'properties.bootstrap.servers' = '……',
>> >>       'properties.group.id' = '……',
>> >>       'scan.startup.mode' = 'latest-offset',
>> >>       'format' = 'json'
>> >>     )
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>
>
> --
>
> Best,
> Benchao Li





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Congxian Qiu
In reply to this post by Benchao Li-2
Hi   鱼子酱
    能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
    另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?

Best,
Congxian


鱼子酱 <[hidden email]> 于2020年7月30日周四 上午10:43写道:

> 感谢!
>
> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
> StateBackend backend =new
>
> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> StateBackend backend =new
>
> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>
>
> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
> RocksDBStateBackend:
> <http://apache-flink.147419.n8.nabble.com/file/t793/444.png>
> FsStateBackend:
> <http://apache-flink.147419.n8.nabble.com/file/t793/555.png>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

鱼子酱
hi,您好:
我改回增量模式重新收集了一些数据:
1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
2、checkpoint是interval设置的是5秒
3、目前这个作业是每分钟一个窗口
4、并行度设置的1,使用on-yarn模式

刚启动的时候,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/6.png>

18分钟后,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/9.png>

checkpoints设置:
<http://apache-flink.147419.n8.nabble.com/file/t793/conf.png>

hdfs上面大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.png>

页面上看到的大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.png>


Congxian Qiu wrote

> Hi   鱼子酱
>     能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
> checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
>     另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>
> Best,
> Congxian
>
>
> 鱼子酱 <

> 384939718@

>> 于2020年7月30日周四 上午10:43写道:
>
>> 感谢!
>>
>> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>> StateBackend backend =new
>>
>> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>> StateBackend backend =new
>>
>> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>>
>>
>> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>> RocksDBStateBackend:
>> &lt;http://apache-flink.147419.n8.nabble.com/file/t793/444.png&gt;
>> FsStateBackend:
>> &lt;http://apache-flink.147419.n8.nabble.com/file/t793/555.png&gt;
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/
op
Reply | Threaded
Open this post in threaded view
|

回复: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

op
&nbsp; &nbsp; 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
逻辑是按照 事件day 和 id 进行groupby 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1440+10))




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月3日(星期一) 中午1:50
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大



hi,您好:
我改回增量模式重新收集了一些数据:
1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
2、checkpoint是interval设置的是5秒
3、目前这个作业是每分钟一个窗口
4、并行度设置的1,使用on-yarn模式

刚启动的时候,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/6.png&gt; 

18分钟后,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/9.png&gt; 

checkpoints设置:
<http://apache-flink.147419.n8.nabble.com/file/t793/conf.png&gt; 

hdfs上面大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.png&gt; 

页面上看到的大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.png&gt; 


Congxian Qiu wrote
&gt; Hi&nbsp;&nbsp; 鱼子酱
&gt;&nbsp;&nbsp;&nbsp;&nbsp; 能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
&gt; checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
&gt;&nbsp;&nbsp;&nbsp;&nbsp; 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
&gt;
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; 鱼子酱 <

&gt; 384939718@

&gt;&gt; 于2020年7月30日周四 上午10:43写道:
&gt;
&gt;&gt; 感谢!
&gt;&gt;
&gt;&gt; flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
&gt;&gt; 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
&gt;&gt; StateBackend backend =new
&gt;&gt;
&gt;&gt; RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
&gt;&gt; StateBackend backend =new
&gt;&gt;
&gt;&gt; FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
&gt;&gt;
&gt;&gt;
&gt;&gt; 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
&gt;&gt; RocksDBStateBackend:
&gt;&gt; &amp;lt;http://apache-flink.147419.n8.nabble.com/file/t793/444.png&amp;gt;
&gt;&gt; FsStateBackend:
&gt;&gt; &amp;lt;http://apache-flink.147419.n8.nabble.com/file/t793/555.png&amp;gt;
&gt;&gt;
&gt;&gt;
&gt;&gt;
&gt;&gt;
&gt;&gt; --
&gt;&gt; Sent from: http://apache-flink.147419.n8.nabble.com/
&gt;&gt;





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Congxian Qiu
Hi
   能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared 目录的数据量看,有增长,后续基本持平。现在
Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果 checkpoint
之间,数据改动很多的话,这个值会变大

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
Best,
Congxian


op <[hidden email]> 于2020年8月3日周一 下午2:18写道:

> &nbsp; &nbsp;
> 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
> 逻辑是按照 事件day 和 id 进行groupby
> 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
> tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> Time.minutes(1440+10))
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> [hidden email]&gt;;
> 发送时间:&nbsp;2020年8月3日(星期一) 中午1:50
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> hi,您好:
> 我改回增量模式重新收集了一些数据:
> 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
> 2、checkpoint是interval设置的是5秒
> 3、目前这个作业是每分钟一个窗口
> 4、并行度设置的1,使用on-yarn模式
>
> 刚启动的时候,如下:
> <http://apache-flink.147419.n8.nabble.com/file/t793/6.png&gt;
>
> 18分钟后,如下:
> <http://apache-flink.147419.n8.nabble.com/file/t793/9.png&gt;
>
> checkpoints设置:
> <http://apache-flink.147419.n8.nabble.com/file/t793/conf.png&gt;
>
> hdfs上面大小:
> <http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.png&gt;
>
> 页面上看到的大小:
> <http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.png&gt;
>
>
> Congxian Qiu wrote
> &gt; Hi&nbsp;&nbsp; 鱼子酱
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; 能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint
> size 的走势呢?另外可以的话,也麻烦你在每次
> &gt; checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
> &gt;
> &gt; Best,
> &gt; Congxian
> &gt;
> &gt;
> &gt; 鱼子酱 <
>
> &gt; 384939718@
>
> &gt;&gt; 于2020年7月30日周四 上午10:43写道:
> &gt;
> &gt;&gt; 感谢!
> &gt;&gt;
> &gt;&gt; flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
> &gt;&gt; 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
> &gt;&gt; StateBackend backend =new
> &gt;&gt;
> &gt;&gt;
> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> &gt;&gt; StateBackend backend =new
> &gt;&gt;
> &gt;&gt;
> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> &gt;&gt;
> &gt;&gt;
> &gt;&gt; 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
> &gt;&gt; RocksDBStateBackend:
> &gt;&gt; &amp;lt;
> http://apache-flink.147419.n8.nabble.com/file/t793/444.png&amp;gt;
> &gt;&gt; FsStateBackend:
> &gt;&gt; &amp;lt;
> http://apache-flink.147419.n8.nabble.com/file/t793/555.png&amp;gt;
> &gt;&gt;
> &gt;&gt;
> &gt;&gt;
> &gt;&gt;
> &gt;&gt; --
> &gt;&gt; Sent from: http://apache-flink.147419.n8.nabble.com/
> &gt;&gt;
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Evan
In reply to this post by 鱼子酱
HI:
    请问这个问题有合理的解释吗,持续关注中。。。
 
发件人: 鱼子酱
发送时间: 2020-08-03 13:50
收件人: user-zh
主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
hi,您好:
我改回增量模式重新收集了一些数据:
1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
2、checkpoint是interval设置的是5秒
3、目前这个作业是每分钟一个窗口
4、并行度设置的1,使用on-yarn模式
 
刚启动的时候,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/6.png>
 
18分钟后,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/9.png>
 
checkpoints设置:
<http://apache-flink.147419.n8.nabble.com/file/t793/conf.png>
 
hdfs上面大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.png>
 
页面上看到的大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.png>
 
 
Congxian Qiu wrote

> Hi   鱼子酱
>     能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
> checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
>     另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>
> Best,
> Congxian
>
>
> 鱼子酱 <
 
> 384939718@
 

>> 于2020年7月30日周四 上午10:43写道:
>
>> 感谢!
>>
>> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>> StateBackend backend =new
>>
>> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>> StateBackend backend =new
>>
>> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>>
>>
>> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>> RocksDBStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/444.png&gt;
>> FsStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/555.png&gt;
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
 
 
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Yun Tang
Hi

RocksDB 的文件更新策略是依赖于level-0的文件数目以及level-1 ~ level-N的每层文件总size,可以参照RocksDB社区关于conpaction的图文描述[1]。默认情况下level-1的target size 是256MB [2],也就是说level-1的总size在256MB以下时,应该是没有触发compaction来降低文件大小的。

从你的UI截图看,实际上checkpoint size很小,建议达到一定数据规模之后,再观察是否“状态越来越大”

[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#structure-of-the-files
[2] http://www.leviathan.vip/2018/03/05/Rocksdb%E7%9A%84Compact/

祝好
唐云

________________________________
From: [hidden email] <[hidden email]>
Sent: Friday, August 7, 2020 10:32
To: user-zh <[hidden email]>
Subject: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

HI:
    请问这个问题有合理的解释吗,持续关注中。。。

发件人: 鱼子酱
发送时间: 2020-08-03 13:50
收件人: user-zh
主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
hi,您好:
我改回增量模式重新收集了一些数据:
1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
2、checkpoint是interval设置的是5秒
3、目前这个作业是每分钟一个窗口
4、并行度设置的1,使用on-yarn模式

刚启动的时候,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/6.png>

18分钟后,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/9.png>

checkpoints设置:
<http://apache-flink.147419.n8.nabble.com/file/t793/conf.png>

hdfs上面大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.png>

页面上看到的大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.png>


Congxian Qiu wrote

> Hi   鱼子酱
>     能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
> checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
>     另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>
> Best,
> Congxian
>
>
> 鱼子酱 <

> 384939718@

>> 于2020年7月30日周四 上午10:43写道:
>
>> 感谢!
>>
>> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>> StateBackend backend =new
>>
>> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>> StateBackend backend =new
>>
>> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>>
>>
>> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>> RocksDBStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/444.png&gt;
>> FsStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/555.png&gt;
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

zhuyuping
In reply to this post by 鱼子酱
<http://apache-flink.147419.n8.nabble.com/file/t912/e1111e0a-24c3-4e40-b830-1eec9f183a02.png>
<http://apache-flink.147419.n8.nabble.com/file/t912/2d1a5ed7-6f46-4091-b65e-9c2c26e5835e.png>

我也出现了这个问题, 我使用的是窗口函数进行group by

发现state 不会清空,还是10m 到后面 几G 缓慢增长,大概每3个checkpoint 增长
任务没有反压。为了测试我使用discardSink

先后 换了 1 second 1分钟,还有proctime rowtime模式 来窗口统计都一样 ,state缓慢增大

CREATE VIEW cpd_expo_feature_collect_view as select
imei,incrmentFeatureCollect(CAST(serverTime AS INT),adId) as feature from
dwd_oth_ads_appstore_exposure_process_view
  group by TUMBLE (proctime, INTERVAL '10' SECOND),imei;



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

zhuyuping
In reply to this post by Benchao Li-2
我这边出现同样的问题,我换成了filesystem 发现state 还是一样缓慢增大,所以应该跟rocksdb 无关



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

zhuyuping
In reply to this post by 鱼子酱
同样出现了这个问题,SQL 使用中,请问是什么原因,翻转tumble窗口当使用mapview 进行操作时候,状态不断的增长
好像不能清理一样,因为正常的window 窗口 窗口结束后会清理状态,现在的情况是1秒的翻转tumble窗口,满满的从最开始的1m 过一个小时变成了1g
............不断的无限增长下去



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Danny Chan
能否提供下完整的 query,方便追踪和排查 ~

Best,
Danny Chan
在 2020年8月31日 +0800 AM10:58,zhuyuping <[hidden email]>,写道:
> 同样出现了这个问题,SQL 使用中,请问是什么原因,翻转tumble窗口当使用mapview 进行操作时候,状态不断的增长
> 好像不能清理一样,因为正常的window 窗口 窗口结束后会清理状态,现在的情况是1秒的翻转tumble窗口,满满的从最开始的1m 过一个小时变成了1g
> ............不断的无限增长下去
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Storm☀️
In reply to this post by zhuyuping
state.backend.incremental 出现问题的时候增量模式是开启的吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

r pp
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式。

Storm☀️ <[hidden email]> 于2020年12月18日周五 上午11:50写道:

> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

r pp
In reply to this post by Storm☀️
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式

Storm☀️ <[hidden email]> 于2020年12月18日周五 上午11:50写道:

> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Storm☀️
In reply to this post by r pp
"计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场 "
 那么解决问题的方法是?生产上state还在不断膨胀。
简单一个问题,生产上发生OOM了,短时间内无法排查出原因,请问如何处理?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
12