使用sql时候,设置了idle过期时间,但是状态还是一直变大

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

使用sql时候,设置了idle过期时间,但是状态还是一直变大

claylin
sql作业定义如下,也通过TableConfig设置了最大和最小idle time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE TABLE yy_yapmnetwork_original (     happenAt BIGINT,     uid BIGINT,     appId STRING,     deviceId STRING,     appVer STRING,     dnsDur BIGINT,     useGlb INT,     hitCache INT,     requestSize DOUBLE,     responseSize DOUBLE,     totalDur BIGINT,     url STRING,     statusCode INT,     prototype STRING,     netType STRING,     traceId STRING,     ts AS CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)),     WATERMARK FOR ts AS ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', 'connector.properties.group.id' = 'interface_success_rate_consumer', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); create table request_latency_tbl (     app_id string,     app_ver string,     net_type string,     prototype string,     url string,     status_code int,     w_start string,     success_cnt BIGINT,     failure_cnt BIGINT,     total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnect=true', 'connector.table' = 'request_latency_statistics', 'connector.username' = 'yapm_metrics', 'connector.password' = '1234456', 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' = '5s', 'connector.write.max-retries' = '2' ); create view request_1minutes_latency  as     select appId, appVer, netType, prototype, url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start,     count(distinct traceId) filter (where statusCode in (200)) as successCnt,     count(distinct traceId) filter (where statusCode not in (200)) as failureCnt,     count(distinct traceId) as total_cnt     from yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into request_latency_tbl     select * from  request_1minutes_latency;
Reply | Threaded
Open this post in threaded view
|

Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

tison
考虑把 SQL 贴成 gist 链接?

Best,
tison.


claylin <[hidden email]> 于2020年5月17日周日 下午5:32写道:

> sql作业定义如下,也通过TableConfig设置了最大和最小idle
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
> TABLE yy_yapmnetwork_original (     happenAt BIGINT,     uid BIGINT,
>  appId STRING,     deviceId STRING,     appVer STRING,     dnsDur BIGINT,
>    useGlb INT,     hitCache INT,     requestSize DOUBLE,     responseSize
> DOUBLE,     totalDur BIGINT,     url STRING,     statusCode INT,
>  prototype STRING,     netType STRING,     traceId STRING,     ts AS
> CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)),     WATERMARK FOR ts AS
> ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
> 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101
> ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
> connector.properties.group.id' = 'interface_success_rate_consumer',
> 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
> create table request_latency_tbl (     app_id string,     app_ver string,
>    net_type string,     prototype string,     url string,     status_code
> int,     w_start string,     success_cnt BIGINT,     failure_cnt BIGINT,
>  total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' =
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;characterEncoding=utf-8&amp;zeroDateTimeBehavior=convertToNull&amp;autoReconnect=true',
> 'connector.table' = 'request_latency_statistics', 'connector.username' =
> 'yapm_metrics', 'connector.password' = '1234456',
> 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval'
> = '5s', 'connector.write.max-retries' = '2' ); create view
> request_1minutes_latency  as     select appId, appVer, netType, prototype,
> url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start,
>  count(distinct traceId) filter (where statusCode in (200)) as successCnt,
>    count(distinct traceId) filter (where statusCode not in (200)) as
> failureCnt,     count(distinct traceId) as total_cnt     from
> yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
> statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into
> request_latency_tbl     select * from  request_1minutes_latency;
Reply | Threaded
Open this post in threaded view
|

回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

claylin
链接这里&nbsp;https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"tison"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月17日(星期天) 下午5:34
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大



考虑把 SQL 贴成 gist 链接?

Best,
tison.


claylin <[hidden email]&gt; 于2020年5月17日周日 下午5:32写道:

&gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle
&gt; time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
&gt; TABLE yy_yapmnetwork_original (&nbsp;&nbsp;&nbsp;&nbsp; happenAt BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; uid BIGINT,
&gt;&nbsp; appId STRING,&nbsp;&nbsp;&nbsp;&nbsp; deviceId STRING,&nbsp;&nbsp;&nbsp;&nbsp; appVer STRING,&nbsp;&nbsp;&nbsp;&nbsp; dnsDur BIGINT,
&gt;&nbsp;&nbsp;&nbsp; useGlb INT,&nbsp;&nbsp;&nbsp;&nbsp; hitCache INT,&nbsp;&nbsp;&nbsp;&nbsp; requestSize DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; responseSize
&gt; DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; totalDur BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; url STRING,&nbsp;&nbsp;&nbsp;&nbsp; statusCode INT,
&gt;&nbsp; prototype STRING,&nbsp;&nbsp;&nbsp;&nbsp; netType STRING,&nbsp;&nbsp;&nbsp;&nbsp; traceId STRING,&nbsp;&nbsp;&nbsp;&nbsp; ts AS
&gt; CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)),&nbsp;&nbsp;&nbsp;&nbsp; WATERMARK FOR ts AS
&gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
&gt; 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
&gt; 'connector.properties.zookeeper.connect' = 'localhost:2181',
&gt; 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101
&gt; ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
&gt; connector.properties.group.id' = 'interface_success_rate_consumer',
&gt; 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
&gt; create table request_latency_tbl (&nbsp;&nbsp;&nbsp;&nbsp; app_id string,&nbsp;&nbsp;&nbsp;&nbsp; app_ver string,
&gt;&nbsp;&nbsp;&nbsp; net_type string,&nbsp;&nbsp;&nbsp;&nbsp; prototype string,&nbsp;&nbsp;&nbsp;&nbsp; url string,&nbsp;&nbsp;&nbsp;&nbsp; status_code
&gt; int,&nbsp;&nbsp;&nbsp;&nbsp; w_start string,&nbsp;&nbsp;&nbsp;&nbsp; success_cnt BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; failure_cnt BIGINT,
&gt;&nbsp; total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' =
&gt; 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;amp;characterEncoding=utf-8&amp;amp;zeroDateTimeBehavior=convertToNull&amp;amp;autoReconnect=true',
&gt; 'connector.table' = 'request_latency_statistics', 'connector.username' =
&gt; 'yapm_metrics', 'connector.password' = '1234456',
&gt; 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval'
&gt; = '5s', 'connector.write.max-retries' = '2' ); create view
&gt; request_1minutes_latency&nbsp; as&nbsp;&nbsp;&nbsp;&nbsp; select appId, appVer, netType, prototype,
&gt; url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start,
&gt;&nbsp; count(distinct traceId) filter (where statusCode in (200)) as successCnt,
&gt;&nbsp;&nbsp;&nbsp; count(distinct traceId) filter (where statusCode not in (200)) as
&gt; failureCnt,&nbsp;&nbsp;&nbsp;&nbsp; count(distinct traceId) as total_cnt&nbsp;&nbsp;&nbsp;&nbsp; from
&gt; yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
&gt; statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into
&gt; request_latency_tbl&nbsp;&nbsp;&nbsp;&nbsp; select * from&nbsp; request_1minutes_latency;
Reply | Threaded
Open this post in threaded view
|

Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

刘大龙
Hi,
   你的状态过期时间设置的是多久?对于普通的group by agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大


> -----原始邮件-----
> 发件人: claylin <[hidden email]>
> 发送时间: 2020-05-17 17:41:13 (星期日)
> 收件人: user-zh <[hidden email]>
> 抄送:
> 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
>
> 链接这里&nbsp;https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"tison"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年5月17日(星期天) 下午5:34
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
>
>
>
> 考虑把 SQL 贴成 gist 链接?
>
> Best,
> tison.
>
>
> claylin <[hidden email]&gt; 于2020年5月17日周日 下午5:32写道:
>
> &gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle
> &gt; time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
> &gt; TABLE yy_yapmnetwork_original (&nbsp;&nbsp;&nbsp;&nbsp; happenAt BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; uid BIGINT,
> &gt;&nbsp; appId STRING,&nbsp;&nbsp;&nbsp;&nbsp; deviceId STRING,&nbsp;&nbsp;&nbsp;&nbsp; appVer STRING,&nbsp;&nbsp;&nbsp;&nbsp; dnsDur BIGINT,
> &gt;&nbsp;&nbsp;&nbsp; useGlb INT,&nbsp;&nbsp;&nbsp;&nbsp; hitCache INT,&nbsp;&nbsp;&nbsp;&nbsp; requestSize DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; responseSize
> &gt; DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; totalDur BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; url STRING,&nbsp;&nbsp;&nbsp;&nbsp; statusCode INT,
> &gt;&nbsp; prototype STRING,&nbsp;&nbsp;&nbsp;&nbsp; netType STRING,&nbsp;&nbsp;&nbsp;&nbsp; traceId STRING,&nbsp;&nbsp;&nbsp;&nbsp; ts AS
> &gt; CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)),&nbsp;&nbsp;&nbsp;&nbsp; WATERMARK FOR ts AS
> &gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
> &gt; 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
> &gt; 'connector.properties.zookeeper.connect' = 'localhost:2181',
> &gt; 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101
> &gt; ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
> &gt; connector.properties.group.id' = 'interface_success_rate_consumer',
> &gt; 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
> &gt; create table request_latency_tbl (&nbsp;&nbsp;&nbsp;&nbsp; app_id string,&nbsp;&nbsp;&nbsp;&nbsp; app_ver string,
> &gt;&nbsp;&nbsp;&nbsp; net_type string,&nbsp;&nbsp;&nbsp;&nbsp; prototype string,&nbsp;&nbsp;&nbsp;&nbsp; url string,&nbsp;&nbsp;&nbsp;&nbsp; status_code
> &gt; int,&nbsp;&nbsp;&nbsp;&nbsp; w_start string,&nbsp;&nbsp;&nbsp;&nbsp; success_cnt BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; failure_cnt BIGINT,
> &gt;&nbsp; total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' =
> &gt; 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;amp;characterEncoding=utf-8&amp;amp;zeroDateTimeBehavior=convertToNull&amp;amp;autoReconnect=true',
> &gt; 'connector.table' = 'request_latency_statistics', 'connector.username' =
> &gt; 'yapm_metrics', 'connector.password' = '1234456',
> &gt; 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval'
> &gt; = '5s', 'connector.write.max-retries' = '2' ); create view
> &gt; request_1minutes_latency&nbsp; as&nbsp;&nbsp;&nbsp;&nbsp; select appId, appVer, netType, prototype,
> &gt; url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start,
> &gt;&nbsp; count(distinct traceId) filter (where statusCode in (200)) as successCnt,
> &gt;&nbsp;&nbsp;&nbsp; count(distinct traceId) filter (where statusCode not in (200)) as
> &gt; failureCnt,&nbsp;&nbsp;&nbsp;&nbsp; count(distinct traceId) as total_cnt&nbsp;&nbsp;&nbsp;&nbsp; from
> &gt; yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
> &gt; statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into
> &gt; request_latency_tbl&nbsp;&nbsp;&nbsp;&nbsp; select * from&nbsp; request_1minutes_latency;


------------------------------
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

claylin
过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M


------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"刘大龙"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月17日(星期天) 晚上10:14
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大



Hi,
&nbsp;&nbsp; 你的状态过期时间设置的是多久?对于普通的group by agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大


&gt; -----原始邮件-----
&gt; 发件人: claylin <[hidden email]&gt;
&gt; 发送时间: 2020-05-17 17:41:13 (星期日)
&gt; 收件人: user-zh <[hidden email]&gt;
&gt; 抄送:
&gt; 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
&gt;
&gt; 链接这里&amp;nbsp;https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"tison"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年5月17日(星期天) 下午5:34
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
&gt;
&gt;
&gt;
&gt; 考虑把 SQL 贴成 gist 链接?
&gt;
&gt; Best,
&gt; tison.
&gt;
&gt;
&gt; claylin <[hidden email]&amp;gt; 于2020年5月17日周日 下午5:32写道:
&gt;
&gt; &amp;gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle
&gt; &amp;gt; time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
&gt; &amp;gt; TABLE yy_yapmnetwork_original (&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; happenAt BIGINT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; uid BIGINT,
&gt; &amp;gt;&amp;nbsp; appId STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; deviceId STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; appVer STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; dnsDur BIGINT,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; useGlb INT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; hitCache INT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; requestSize DOUBLE,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; responseSize
&gt; &amp;gt; DOUBLE,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; totalDur BIGINT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; url STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; statusCode INT,
&gt; &amp;gt;&amp;nbsp; prototype STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; netType STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; traceId STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ts AS
&gt; &amp;gt; CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)),&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; WATERMARK FOR ts AS
&gt; &amp;gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
&gt; &amp;gt; 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
&gt; &amp;gt; 'connector.properties.zookeeper.connect' = 'localhost:2181',
&gt; &amp;gt; 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101
&gt; &amp;gt; ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
&gt; &amp;gt; connector.properties.group.id' = 'interface_success_rate_consumer',
&gt; &amp;gt; 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
&gt; &amp;gt; create table request_latency_tbl (&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; app_id string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; app_ver string,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; net_type string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; prototype string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; url string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; status_code
&gt; &amp;gt; int,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; w_start string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; success_cnt BIGINT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; failure_cnt BIGINT,
&gt; &amp;gt;&amp;nbsp; total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' =
&gt; &amp;gt; 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;amp;amp;characterEncoding=utf-8&amp;amp;amp;zeroDateTimeBehavior=convertToNull&amp;amp;amp;autoReconnect=true',
&gt; &amp;gt; 'connector.table' = 'request_latency_statistics', 'connector.username' =
&gt; &amp;gt; 'yapm_metrics', 'connector.password' = '1234456',
&gt; &amp;gt; 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval'
&gt; &amp;gt; = '5s', 'connector.write.max-retries' = '2' ); create view
&gt; &amp;gt; request_1minutes_latency&amp;nbsp; as&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; select appId, appVer, netType, prototype,
&gt; &amp;gt; url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start,
&gt; &amp;gt;&amp;nbsp; count(distinct traceId) filter (where statusCode in (200)) as successCnt,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; count(distinct traceId) filter (where statusCode not in (200)) as
&gt; &amp;gt; failureCnt,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; count(distinct traceId) as total_cnt&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; from
&gt; &amp;gt; yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
&gt; &amp;gt; statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into
&gt; &amp;gt; request_latency_tbl&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; select * from&amp;nbsp; request_1minutes_latency;


------------------------------
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281
Reply | Threaded
Open this post in threaded view
|

Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

LakeShen
Hi,

你可以描述一下你的 Flink 版本,具体空闲状态保留时间的含义,请参考一下[1]:

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time

Best,
LakeShen



claylin <[hidden email]> 于2020年5月17日周日 下午10:24写道:

> 过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"刘大龙"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年5月17日(星期天) 晚上10:14
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
>
>
>
> Hi,
> &nbsp;&nbsp; 你的状态过期时间设置的是多久?对于普通的group by
> agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大
>
>
> &gt; -----原始邮件-----
> &gt; 发件人: claylin <[hidden email]&gt;
> &gt; 发送时间: 2020-05-17 17:41:13 (星期日)
> &gt; 收件人: user-zh <[hidden email]&gt;
> &gt; 抄送:
> &gt; 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> &gt;
> &gt; 链接这里&amp;nbsp;
> https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
> &gt
> <https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4&gt>;
>
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"tison"<[hidden email]&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年5月17日(星期天) 下午5:34
> &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> &gt;
> &gt;
> &gt;
> &gt; 考虑把 SQL 贴成 gist 链接?
> &gt;
> &gt; Best,
> &gt; tison.
> &gt;
> &gt;
> &gt; claylin <[hidden email]&amp;gt; 于2020年5月17日周日 下午5:32写道:
> &gt;
> &gt; &amp;gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle
> &gt; &amp;gt;
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
> &gt; &amp;gt; TABLE yy_yapmnetwork_original
> (&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; happenAt
> BIGINT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; uid BIGINT,
> &gt; &amp;gt;&amp;nbsp; appId
> STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; deviceId
> STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; appVer
> STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; dnsDur BIGINT,
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; useGlb
> INT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; hitCache
> INT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; requestSize
> DOUBLE,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; responseSize
> &gt; &amp;gt; DOUBLE,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; totalDur
> BIGINT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; url
> STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; statusCode INT,
> &gt; &amp;gt;&amp;nbsp; prototype
> STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; netType
> STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; traceId
> STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ts AS
> &gt; &amp;gt; CAST(FROM_UNIXTIME(happenAt/1000) AS
> TIMESTAMP(3)),&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; WATERMARK FOR ts AS
> &gt; &amp;gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
> &gt; &amp;gt; 'connector.version' = 'universal', 'connector.topic' =
> 'yapm_metrics',
> &gt; &amp;gt; 'connector.properties.zookeeper.connect' = 'localhost:2181',
> &gt; &amp;gt; 'connector.properties.bootstrap.servers' = '
> kafkawx007-core001.yy.com:8101
> &gt; &amp;gt; ,kafkawx007-core002.yy.com:8101,
> kafkawx007-core003.yy.com:8101', '
> &gt; &amp;gt; connector.properties.group.id' =
> 'interface_success_rate_consumer',
> &gt; &amp;gt; 'connector.startup-mode' = 'latest-offset', 'format.type' =
> 'json' );
> &gt; &amp;gt; create table request_latency_tbl
> (&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; app_id
> string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; app_ver string,
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; net_type
> string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; prototype
> string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; url
> string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; status_code
> &gt; &amp;gt; int,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; w_start
> string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; success_cnt
> BIGINT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; failure_cnt BIGINT,
> &gt; &amp;gt;&amp;nbsp; total_cnt BIGINT ) with( 'connector.type' =
> 'jdbc', 'connector.url' =
> &gt; &amp;gt;
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;amp;amp;characterEncoding=utf-8&amp;amp;amp;zeroDateTimeBehavior=convertToNull&amp;amp;amp;autoReconnect=true',
> &gt; &amp;gt; 'connector.table' = 'request_latency_statistics',
> 'connector.username' =
> &gt; &amp;gt; 'yapm_metrics', 'connector.password' = '1234456',
> &gt; &amp;gt; 'connector.write.flush.max-rows' = '1000',
> 'connector.write.flush.interval'
> &gt; &amp;gt; = '5s', 'connector.write.max-retries' = '2' ); create view
> &gt; &amp;gt; request_1minutes_latency&amp;nbsp;
> as&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; select appId, appVer, netType,
> prototype,
> &gt; &amp;gt; url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start,
> &gt; &amp;gt;&amp;nbsp; count(distinct traceId) filter (where statusCode
> in (200)) as successCnt,
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; count(distinct traceId) filter
> (where statusCode not in (200)) as
> &gt; &amp;gt; failureCnt,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> count(distinct traceId) as
> total_cnt&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; from
> &gt; &amp;gt; yy_yapmnetwork_original group by appId, appVer, netType,
> prototype, url,
> &gt; &amp;gt; statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into
> &gt; &amp;gt; request_latency_tbl&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> select * from&amp;nbsp; request_1minutes_latency;
>
>
> ------------------------------
> 刘大龙
>
> 浙江大学 控制系 智能系统与控制研究所 工控新楼217
> 地址:浙江省杭州市浙大路38号浙江大学玉泉校区
> Tel:18867547281
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

claylin
使用的版本是1.10,我是按照每分钟做分组聚合,一分钟过后按理说是不会有更新操作,会触发清理过期状态,kafka入口流量大小是2-5M/s




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"LakeShen"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月18日(星期一) 上午10:32
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大



Hi,

你可以描述一下你的 Flink 版本,具体空闲状态保留时间的含义,请参考一下[1]:

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time

Best,
LakeShen



claylin <[hidden email]&gt; 于2020年5月17日周日 下午10:24写道:

&gt; 过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"刘大龙"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年5月17日(星期天) 晚上10:14
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
&gt;
&gt;
&gt;
&gt; Hi,
&gt; &amp;nbsp;&amp;nbsp; 你的状态过期时间设置的是多久?对于普通的group by
&gt; agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大
&gt;
&gt;
&gt; &amp;gt; -----原始邮件-----
&gt; &amp;gt; 发件人: claylin <[hidden email]&amp;gt;
&gt; &amp;gt; 发送时间: 2020-05-17 17:41:13 (星期日)
&gt; &amp;gt; 收件人: user-zh <[hidden email]&amp;gt;
&gt; &amp;gt; 抄送:
&gt; &amp;gt; 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
&gt; &amp;gt;
&gt; &amp;gt; 链接这里&amp;amp;nbsp;
&gt; https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
&gt; &amp;gt
&gt; <https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4&amp;gt&gt;;
&gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&gt; &amp;gt; 发件人:&amp;amp;nbsp;"tison"<[hidden email]&amp;amp;gt;;
&gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年5月17日(星期天) 下午5:34
&gt; &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;;
&gt; &amp;gt;
&gt; &amp;gt; 主题:&amp;amp;nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 考虑把 SQL 贴成 gist 链接?
&gt; &amp;gt;
&gt; &amp;gt; Best,
&gt; &amp;gt; tison.
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; claylin <[hidden email]&amp;amp;gt; 于2020年5月17日周日 下午5:32写道:
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle
&gt; &amp;gt; &amp;amp;gt;
&gt; time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
&gt; &amp;gt; &amp;amp;gt; TABLE yy_yapmnetwork_original
&gt; (&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; happenAt
&gt; BIGINT,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; uid BIGINT,
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp; appId
&gt; STRING,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; deviceId
&gt; STRING,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; appVer
&gt; STRING,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; dnsDur BIGINT,
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; useGlb
&gt; INT,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; hitCache
&gt; INT,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; requestSize
&gt; DOUBLE,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; responseSize
&gt; &amp;gt; &amp;amp;gt; DOUBLE,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; totalDur
&gt; BIGINT,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; url
&gt; STRING,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; statusCode INT,
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp; prototype
&gt; STRING,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; netType
&gt; STRING,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; traceId
&gt; STRING,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; ts AS
&gt; &amp;gt; &amp;amp;gt; CAST(FROM_UNIXTIME(happenAt/1000) AS
&gt; TIMESTAMP(3)),&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; WATERMARK FOR ts AS
&gt; &amp;gt; &amp;amp;gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
&gt; &amp;gt; &amp;amp;gt; 'connector.version' = 'universal', 'connector.topic' =
&gt; 'yapm_metrics',
&gt; &amp;gt; &amp;amp;gt; 'connector.properties.zookeeper.connect' = 'localhost:2181',
&gt; &amp;gt; &amp;amp;gt; 'connector.properties.bootstrap.servers' = '
&gt; kafkawx007-core001.yy.com:8101
&gt; &amp;gt; &amp;amp;gt; ,kafkawx007-core002.yy.com:8101,
&gt; kafkawx007-core003.yy.com:8101', '
&gt; &amp;gt; &amp;amp;gt; connector.properties.group.id' =
&gt; 'interface_success_rate_consumer',
&gt; &amp;gt; &amp;amp;gt; 'connector.startup-mode' = 'latest-offset', 'format.type' =
&gt; 'json' );
&gt; &amp;gt; &amp;amp;gt; create table request_latency_tbl
&gt; (&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; app_id
&gt; string,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; app_ver string,
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; net_type
&gt; string,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; prototype
&gt; string,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; url
&gt; string,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; status_code
&gt; &amp;gt; &amp;amp;gt; int,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; w_start
&gt; string,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; success_cnt
&gt; BIGINT,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; failure_cnt BIGINT,
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp; total_cnt BIGINT ) with( 'connector.type' =
&gt; 'jdbc', 'connector.url' =
&gt; &amp;gt; &amp;amp;gt;
&gt; 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;amp;amp;amp;characterEncoding=utf-8&amp;amp;amp;amp;zeroDateTimeBehavior=convertToNull&amp;amp;amp;amp;autoReconnect=true',
&gt; &amp;gt; &amp;amp;gt; 'connector.table' = 'request_latency_statistics',
&gt; 'connector.username' =
&gt; &amp;gt; &amp;amp;gt; 'yapm_metrics', 'connector.password' = '1234456',
&gt; &amp;gt; &amp;amp;gt; 'connector.write.flush.max-rows' = '1000',
&gt; 'connector.write.flush.interval'
&gt; &amp;gt; &amp;amp;gt; = '5s', 'connector.write.max-retries' = '2' ); create view
&gt; &amp;gt; &amp;amp;gt; request_1minutes_latency&amp;amp;nbsp;
&gt; as&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; select appId, appVer, netType,
&gt; prototype,
&gt; &amp;gt; &amp;amp;gt; url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start,
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp; count(distinct traceId) filter (where statusCode
&gt; in (200)) as successCnt,
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; count(distinct traceId) filter
&gt; (where statusCode not in (200)) as
&gt; &amp;gt; &amp;amp;gt; failureCnt,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; count(distinct traceId) as
&gt; total_cnt&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; from
&gt; &amp;gt; &amp;amp;gt; yy_yapmnetwork_original group by appId, appVer, netType,
&gt; prototype, url,
&gt; &amp;gt; &amp;amp;gt; statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into
&gt; &amp;gt; &amp;amp;gt; request_latency_tbl&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; select * from&amp;amp;nbsp; request_1minutes_latency;
&gt;
&gt;
&gt; ------------------------------
&gt; 刘大龙
&gt;
&gt; 浙江大学 控制系 智能系统与控制研究所 工控新楼217
&gt; 地址:浙江省杭州市浙大路38号浙江大学玉泉校区
&gt; Tel:18867547281
Reply | Threaded
Open this post in threaded view
|

Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

Storm☀️
In reply to this post by claylin
flink 1.10.1 同样遇到这个问题 设置了ttl但是没有生效,请问题主解决该问题了吗?
*sql*:
select
*
from
xx
group by
TUMBLE(monitor_processtime, INTERVAL '60' SECOND),topic_identity

*60s的窗口,设置的过期时间是2分钟,但是checkpoint中状态还是在变大*

*tEnv.getConfig().setIdleStateRetentionTime(Time.minutes(2),
Time.minutes(5)); *
               



--
Sent from: http://apache-flink.147419.n8.nabble.com/