sql作业定义如下,也通过TableConfig设置了最大和最小idle time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT, appId STRING, deviceId STRING, appVer STRING, dnsDur BIGINT, useGlb INT, hitCache INT, requestSize DOUBLE, responseSize DOUBLE, totalDur BIGINT, url STRING, statusCode INT, prototype STRING, netType STRING, traceId STRING, ts AS CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)), WATERMARK FOR ts AS ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', 'connector.properties.group.id' = 'interface_success_rate_consumer', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); create table request_latency_tbl ( app_id string, app_ver string, net_type string, prototype string, url string, status_code int, w_start string, success_cnt BIGINT, failure_cnt BIGINT, total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnect=true', 'connector.table' = 'request_latency_statistics', 'connector.username' = 'yapm_metrics', 'connector.password' = '1234456', 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' = '5s', 'connector.write.max-retries' = '2' ); create view request_1minutes_latency as select appId, appVer, netType, prototype, url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start, count(distinct traceId) filter (where statusCode in (200)) as successCnt, count(distinct traceId) filter (where statusCode not in (200)) as failureCnt, count(distinct traceId) as total_cnt from yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into request_latency_tbl select * from request_1minutes_latency;
|
考虑把 SQL 贴成 gist 链接?
Best, tison. claylin <[hidden email]> 于2020年5月17日周日 下午5:32写道: > sql作业定义如下,也通过TableConfig设置了最大和最小idle > time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT, > appId STRING, deviceId STRING, appVer STRING, dnsDur BIGINT, > useGlb INT, hitCache INT, requestSize DOUBLE, responseSize > DOUBLE, totalDur BIGINT, url STRING, statusCode INT, > prototype STRING, netType STRING, traceId STRING, ts AS > CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)), WATERMARK FOR ts AS > ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101 > ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', ' > connector.properties.group.id' = 'interface_success_rate_consumer', > 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); > create table request_latency_tbl ( app_id string, app_ver string, > net_type string, prototype string, url string, status_code > int, w_start string, success_cnt BIGINT, failure_cnt BIGINT, > total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' = > 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnect=true', > 'connector.table' = 'request_latency_statistics', 'connector.username' = > 'yapm_metrics', 'connector.password' = '1234456', > 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' > = '5s', 'connector.write.max-retries' = '2' ); create view > request_1minutes_latency as select appId, appVer, netType, prototype, > url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start, > count(distinct traceId) filter (where statusCode in (200)) as successCnt, > count(distinct traceId) filter (where statusCode not in (200)) as > failureCnt, count(distinct traceId) as total_cnt from > yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, > statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into > request_latency_tbl select * from request_1minutes_latency; |
链接这里 https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
------------------ 原始邮件 ------------------ 发件人: "tison"<[hidden email]>; 发送时间: 2020年5月17日(星期天) 下午5:34 收件人: "user-zh"<[hidden email]>; 主题: Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 考虑把 SQL 贴成 gist 链接? Best, tison. claylin <[hidden email]> 于2020年5月17日周日 下午5:32写道: > sql作业定义如下,也通过TableConfig设置了最大和最小idle > time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT, > appId STRING, deviceId STRING, appVer STRING, dnsDur BIGINT, > useGlb INT, hitCache INT, requestSize DOUBLE, responseSize > DOUBLE, totalDur BIGINT, url STRING, statusCode INT, > prototype STRING, netType STRING, traceId STRING, ts AS > CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)), WATERMARK FOR ts AS > ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101 > ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', ' > connector.properties.group.id' = 'interface_success_rate_consumer', > 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); > create table request_latency_tbl ( app_id string, app_ver string, > net_type string, prototype string, url string, status_code > int, w_start string, success_cnt BIGINT, failure_cnt BIGINT, > total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' = > 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;characterEncoding=utf-8&amp;zeroDateTimeBehavior=convertToNull&amp;autoReconnect=true', > 'connector.table' = 'request_latency_statistics', 'connector.username' = > 'yapm_metrics', 'connector.password' = '1234456', > 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' > = '5s', 'connector.write.max-retries' = '2' ); create view > request_1minutes_latency as select appId, appVer, netType, prototype, > url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start, > count(distinct traceId) filter (where statusCode in (200)) as successCnt, > count(distinct traceId) filter (where statusCode not in (200)) as > failureCnt, count(distinct traceId) as total_cnt from > yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, > statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into > request_latency_tbl select * from request_1minutes_latency; |
Hi,
你的状态过期时间设置的是多久?对于普通的group by agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大 > -----原始邮件----- > 发件人: claylin <[hidden email]> > 发送时间: 2020-05-17 17:41:13 (星期日) > 收件人: user-zh <[hidden email]> > 抄送: > 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > 链接这里 https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "tison"<[hidden email]>; > 发送时间: 2020年5月17日(星期天) 下午5:34 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > 考虑把 SQL 贴成 gist 链接? > > Best, > tison. > > > claylin <[hidden email]> 于2020年5月17日周日 下午5:32写道: > > > sql作业定义如下,也通过TableConfig设置了最大和最小idle > > time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > > TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT, > > appId STRING, deviceId STRING, appVer STRING, dnsDur BIGINT, > > useGlb INT, hitCache INT, requestSize DOUBLE, responseSize > > DOUBLE, totalDur BIGINT, url STRING, statusCode INT, > > prototype STRING, netType STRING, traceId STRING, ts AS > > CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)), WATERMARK FOR ts AS > > ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > > 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', > > 'connector.properties.zookeeper.connect' = 'localhost:2181', > > 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101 > > ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', ' > > connector.properties.group.id' = 'interface_success_rate_consumer', > > 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); > > create table request_latency_tbl ( app_id string, app_ver string, > > net_type string, prototype string, url string, status_code > > int, w_start string, success_cnt BIGINT, failure_cnt BIGINT, > > total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' = > > 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;characterEncoding=utf-8&amp;zeroDateTimeBehavior=convertToNull&amp;autoReconnect=true', > > 'connector.table' = 'request_latency_statistics', 'connector.username' = > > 'yapm_metrics', 'connector.password' = '1234456', > > 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' > > = '5s', 'connector.write.max-retries' = '2' ); create view > > request_1minutes_latency as select appId, appVer, netType, prototype, > > url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start, > > count(distinct traceId) filter (where statusCode in (200)) as successCnt, > > count(distinct traceId) filter (where statusCode not in (200)) as > > failureCnt, count(distinct traceId) as total_cnt from > > yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, > > statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into > > request_latency_tbl select * from request_1minutes_latency; ------------------------------ 刘大龙 浙江大学 控制系 智能系统与控制研究所 工控新楼217 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 Tel:18867547281 |
过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M
------------------ 原始邮件 ------------------ 发件人: "刘大龙"<[hidden email]>; 发送时间: 2020年5月17日(星期天) 晚上10:14 收件人: "user-zh"<[hidden email]>; 主题: Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 Hi, 你的状态过期时间设置的是多久?对于普通的group by agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大 > -----原始邮件----- > 发件人: claylin <[hidden email]> > 发送时间: 2020-05-17 17:41:13 (星期日) > 收件人: user-zh <[hidden email]> > 抄送: > 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > 链接这里&nbsp;https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"tison"<[hidden email]&gt;; > 发送时间:&nbsp;2020年5月17日(星期天) 下午5:34 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > 考虑把 SQL 贴成 gist 链接? > > Best, > tison. > > > claylin <[hidden email]&gt; 于2020年5月17日周日 下午5:32写道: > > &gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle > &gt; time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > &gt; TABLE yy_yapmnetwork_original (&nbsp;&nbsp;&nbsp;&nbsp; happenAt BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; uid BIGINT, > &gt;&nbsp; appId STRING,&nbsp;&nbsp;&nbsp;&nbsp; deviceId STRING,&nbsp;&nbsp;&nbsp;&nbsp; appVer STRING,&nbsp;&nbsp;&nbsp;&nbsp; dnsDur BIGINT, > &gt;&nbsp;&nbsp;&nbsp; useGlb INT,&nbsp;&nbsp;&nbsp;&nbsp; hitCache INT,&nbsp;&nbsp;&nbsp;&nbsp; requestSize DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; responseSize > &gt; DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; totalDur BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; url STRING,&nbsp;&nbsp;&nbsp;&nbsp; statusCode INT, > &gt;&nbsp; prototype STRING,&nbsp;&nbsp;&nbsp;&nbsp; netType STRING,&nbsp;&nbsp;&nbsp;&nbsp; traceId STRING,&nbsp;&nbsp;&nbsp;&nbsp; ts AS > &gt; CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)),&nbsp;&nbsp;&nbsp;&nbsp; WATERMARK FOR ts AS > &gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > &gt; 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', > &gt; 'connector.properties.zookeeper.connect' = 'localhost:2181', > &gt; 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101 > &gt; ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', ' > &gt; connector.properties.group.id' = 'interface_success_rate_consumer', > &gt; 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); > &gt; create table request_latency_tbl (&nbsp;&nbsp;&nbsp;&nbsp; app_id string,&nbsp;&nbsp;&nbsp;&nbsp; app_ver string, > &gt;&nbsp;&nbsp;&nbsp; net_type string,&nbsp;&nbsp;&nbsp;&nbsp; prototype string,&nbsp;&nbsp;&nbsp;&nbsp; url string,&nbsp;&nbsp;&nbsp;&nbsp; status_code > &gt; int,&nbsp;&nbsp;&nbsp;&nbsp; w_start string,&nbsp;&nbsp;&nbsp;&nbsp; success_cnt BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; failure_cnt BIGINT, > &gt;&nbsp; total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' = > &gt; 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;amp;characterEncoding=utf-8&amp;amp;zeroDateTimeBehavior=convertToNull&amp;amp;autoReconnect=true', > &gt; 'connector.table' = 'request_latency_statistics', 'connector.username' = > &gt; 'yapm_metrics', 'connector.password' = '1234456', > &gt; 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' > &gt; = '5s', 'connector.write.max-retries' = '2' ); create view > &gt; request_1minutes_latency&nbsp; as&nbsp;&nbsp;&nbsp;&nbsp; select appId, appVer, netType, prototype, > &gt; url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start, > &gt;&nbsp; count(distinct traceId) filter (where statusCode in (200)) as successCnt, > &gt;&nbsp;&nbsp;&nbsp; count(distinct traceId) filter (where statusCode not in (200)) as > &gt; failureCnt,&nbsp;&nbsp;&nbsp;&nbsp; count(distinct traceId) as total_cnt&nbsp;&nbsp;&nbsp;&nbsp; from > &gt; yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, > &gt; statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into > &gt; request_latency_tbl&nbsp;&nbsp;&nbsp;&nbsp; select * from&nbsp; request_1minutes_latency; ------------------------------ 刘大龙 浙江大学 控制系 智能系统与控制研究所 工控新楼217 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 Tel:18867547281 |
Hi,
你可以描述一下你的 Flink 版本,具体空闲状态保留时间的含义,请参考一下[1]: [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time Best, LakeShen claylin <[hidden email]> 于2020年5月17日周日 下午10:24写道: > 过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M > > > ------------------ 原始邮件 ------------------ > 发件人: "刘大龙"<[hidden email]>; > 发送时间: 2020年5月17日(星期天) 晚上10:14 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > Hi, > 你的状态过期时间设置的是多久?对于普通的group by > agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大 > > > > -----原始邮件----- > > 发件人: claylin <[hidden email]> > > 发送时间: 2020-05-17 17:41:13 (星期日) > > 收件人: user-zh <[hidden email]> > > 抄送: > > 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > 链接这里&nbsp; > https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 > > > <https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4>>; > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"tison"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年5月17日(星期天) 下午5:34 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > > > > > 考虑把 SQL 贴成 gist 链接? > > > > Best, > > tison. > > > > > > claylin <[hidden email]&gt; 于2020年5月17日周日 下午5:32写道: > > > > &gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle > > &gt; > time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > > &gt; TABLE yy_yapmnetwork_original > (&nbsp;&nbsp;&nbsp;&nbsp; happenAt > BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; uid BIGINT, > > &gt;&nbsp; appId > STRING,&nbsp;&nbsp;&nbsp;&nbsp; deviceId > STRING,&nbsp;&nbsp;&nbsp;&nbsp; appVer > STRING,&nbsp;&nbsp;&nbsp;&nbsp; dnsDur BIGINT, > > &gt;&nbsp;&nbsp;&nbsp; useGlb > INT,&nbsp;&nbsp;&nbsp;&nbsp; hitCache > INT,&nbsp;&nbsp;&nbsp;&nbsp; requestSize > DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; responseSize > > &gt; DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; totalDur > BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; url > STRING,&nbsp;&nbsp;&nbsp;&nbsp; statusCode INT, > > &gt;&nbsp; prototype > STRING,&nbsp;&nbsp;&nbsp;&nbsp; netType > STRING,&nbsp;&nbsp;&nbsp;&nbsp; traceId > STRING,&nbsp;&nbsp;&nbsp;&nbsp; ts AS > > &gt; CAST(FROM_UNIXTIME(happenAt/1000) AS > TIMESTAMP(3)),&nbsp;&nbsp;&nbsp;&nbsp; WATERMARK FOR ts AS > > &gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > > &gt; 'connector.version' = 'universal', 'connector.topic' = > 'yapm_metrics', > > &gt; 'connector.properties.zookeeper.connect' = 'localhost:2181', > > &gt; 'connector.properties.bootstrap.servers' = ' > kafkawx007-core001.yy.com:8101 > > &gt; ,kafkawx007-core002.yy.com:8101, > kafkawx007-core003.yy.com:8101', ' > > &gt; connector.properties.group.id' = > 'interface_success_rate_consumer', > > &gt; 'connector.startup-mode' = 'latest-offset', 'format.type' = > 'json' ); > > &gt; create table request_latency_tbl > (&nbsp;&nbsp;&nbsp;&nbsp; app_id > string,&nbsp;&nbsp;&nbsp;&nbsp; app_ver string, > > &gt;&nbsp;&nbsp;&nbsp; net_type > string,&nbsp;&nbsp;&nbsp;&nbsp; prototype > string,&nbsp;&nbsp;&nbsp;&nbsp; url > string,&nbsp;&nbsp;&nbsp;&nbsp; status_code > > &gt; int,&nbsp;&nbsp;&nbsp;&nbsp; w_start > string,&nbsp;&nbsp;&nbsp;&nbsp; success_cnt > BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; failure_cnt BIGINT, > > &gt;&nbsp; total_cnt BIGINT ) with( 'connector.type' = > 'jdbc', 'connector.url' = > > &gt; > 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;amp;characterEncoding=utf-8&amp;amp;zeroDateTimeBehavior=convertToNull&amp;amp;autoReconnect=true', > > &gt; 'connector.table' = 'request_latency_statistics', > 'connector.username' = > > &gt; 'yapm_metrics', 'connector.password' = '1234456', > > &gt; 'connector.write.flush.max-rows' = '1000', > 'connector.write.flush.interval' > > &gt; = '5s', 'connector.write.max-retries' = '2' ); create view > > &gt; request_1minutes_latency&nbsp; > as&nbsp;&nbsp;&nbsp;&nbsp; select appId, appVer, netType, > prototype, > > &gt; url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start, > > &gt;&nbsp; count(distinct traceId) filter (where statusCode > in (200)) as successCnt, > > &gt;&nbsp;&nbsp;&nbsp; count(distinct traceId) filter > (where statusCode not in (200)) as > > &gt; failureCnt,&nbsp;&nbsp;&nbsp;&nbsp; > count(distinct traceId) as > total_cnt&nbsp;&nbsp;&nbsp;&nbsp; from > > &gt; yy_yapmnetwork_original group by appId, appVer, netType, > prototype, url, > > &gt; statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into > > &gt; request_latency_tbl&nbsp;&nbsp;&nbsp;&nbsp; > select * from&nbsp; request_1minutes_latency; > > > ------------------------------ > 刘大龙 > > 浙江大学 控制系 智能系统与控制研究所 工控新楼217 > 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 > Tel:18867547281 |
使用的版本是1.10,我是按照每分钟做分组聚合,一分钟过后按理说是不会有更新操作,会触发清理过期状态,kafka入口流量大小是2-5M/s
------------------ 原始邮件 ------------------ 发件人: "LakeShen"<[hidden email]>; 发送时间: 2020年5月18日(星期一) 上午10:32 收件人: "user-zh"<[hidden email]>; 主题: Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 Hi, 你可以描述一下你的 Flink 版本,具体空闲状态保留时间的含义,请参考一下[1]: [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time Best, LakeShen claylin <[hidden email]> 于2020年5月17日周日 下午10:24写道: > 过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"刘大龙"<[hidden email]&gt;; > 发送时间:&nbsp;2020年5月17日(星期天) 晚上10:14 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > Hi, > &nbsp;&nbsp; 你的状态过期时间设置的是多久?对于普通的group by > agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大 > > > &gt; -----原始邮件----- > &gt; 发件人: claylin <[hidden email]&gt; > &gt; 发送时间: 2020-05-17 17:41:13 (星期日) > &gt; 收件人: user-zh <[hidden email]&gt; > &gt; 抄送: > &gt; 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > &gt; > &gt; 链接这里&amp;nbsp; > https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 > &gt > <https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4&gt>; > > &gt; > &gt; > &gt; > &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > &gt; 发件人:&amp;nbsp;"tison"<[hidden email]&amp;gt;; > &gt; 发送时间:&amp;nbsp;2020年5月17日(星期天) 下午5:34 > &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; > &gt; > &gt; 主题:&amp;nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > &gt; > &gt; > &gt; > &gt; 考虑把 SQL 贴成 gist 链接? > &gt; > &gt; Best, > &gt; tison. > &gt; > &gt; > &gt; claylin <[hidden email]&amp;gt; 于2020年5月17日周日 下午5:32写道: > &gt; > &gt; &amp;gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle > &gt; &amp;gt; > time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > &gt; &amp;gt; TABLE yy_yapmnetwork_original > (&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; happenAt > BIGINT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; uid BIGINT, > &gt; &amp;gt;&amp;nbsp; appId > STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; deviceId > STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; appVer > STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; dnsDur BIGINT, > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; useGlb > INT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; hitCache > INT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; requestSize > DOUBLE,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; responseSize > &gt; &amp;gt; DOUBLE,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; totalDur > BIGINT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; url > STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; statusCode INT, > &gt; &amp;gt;&amp;nbsp; prototype > STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; netType > STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; traceId > STRING,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ts AS > &gt; &amp;gt; CAST(FROM_UNIXTIME(happenAt/1000) AS > TIMESTAMP(3)),&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; WATERMARK FOR ts AS > &gt; &amp;gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > &gt; &amp;gt; 'connector.version' = 'universal', 'connector.topic' = > 'yapm_metrics', > &gt; &amp;gt; 'connector.properties.zookeeper.connect' = 'localhost:2181', > &gt; &amp;gt; 'connector.properties.bootstrap.servers' = ' > kafkawx007-core001.yy.com:8101 > &gt; &amp;gt; ,kafkawx007-core002.yy.com:8101, > kafkawx007-core003.yy.com:8101', ' > &gt; &amp;gt; connector.properties.group.id' = > 'interface_success_rate_consumer', > &gt; &amp;gt; 'connector.startup-mode' = 'latest-offset', 'format.type' = > 'json' ); > &gt; &amp;gt; create table request_latency_tbl > (&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; app_id > string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; app_ver string, > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; net_type > string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; prototype > string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; url > string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; status_code > &gt; &amp;gt; int,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; w_start > string,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; success_cnt > BIGINT,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; failure_cnt BIGINT, > &gt; &amp;gt;&amp;nbsp; total_cnt BIGINT ) with( 'connector.type' = > 'jdbc', 'connector.url' = > &gt; &amp;gt; > 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;amp;amp;characterEncoding=utf-8&amp;amp;amp;zeroDateTimeBehavior=convertToNull&amp;amp;amp;autoReconnect=true', > &gt; &amp;gt; 'connector.table' = 'request_latency_statistics', > 'connector.username' = > &gt; &amp;gt; 'yapm_metrics', 'connector.password' = '1234456', > &gt; &amp;gt; 'connector.write.flush.max-rows' = '1000', > 'connector.write.flush.interval' > &gt; &amp;gt; = '5s', 'connector.write.max-retries' = '2' ); create view > &gt; &amp;gt; request_1minutes_latency&amp;nbsp; > as&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; select appId, appVer, netType, > prototype, > &gt; &amp;gt; url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start, > &gt; &amp;gt;&amp;nbsp; count(distinct traceId) filter (where statusCode > in (200)) as successCnt, > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; count(distinct traceId) filter > (where statusCode not in (200)) as > &gt; &amp;gt; failureCnt,&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > count(distinct traceId) as > total_cnt&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; from > &gt; &amp;gt; yy_yapmnetwork_original group by appId, appVer, netType, > prototype, url, > &gt; &amp;gt; statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into > &gt; &amp;gt; request_latency_tbl&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > select * from&amp;nbsp; request_1minutes_latency; > > > ------------------------------ > 刘大龙 > > 浙江大学 控制系 智能系统与控制研究所 工控新楼217 > 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 > Tel:18867547281 |
In reply to this post by claylin
flink 1.10.1 同样遇到这个问题 设置了ttl但是没有生效,请问题主解决该问题了吗?
*sql*: select * from xx group by TUMBLE(monitor_processtime, INTERVAL '60' SECOND),topic_identity *60s的窗口,设置的过期时间是2分钟,但是checkpoint中状态还是在变大* *tEnv.getConfig().setIdleStateRetentionTime(Time.minutes(2), Time.minutes(5)); * -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |