版本:1.12.2
sql: SELECT id, name, message,ts SELECT ROW_NUMBER() OVER (PARTITION BY name ORDER BY ts DESC) AS rowNum FROM persons_message_table_kafka WHERE rowNum = 1 过期时间设置:tableEnv.getConfig().setIdleStateRetention(Duration.ofhour(3)); 问题:checkpoint数据一直在线上增加,一开始90m,然后每天增长20m,但是源数据并没有太多增长 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
你好,
sql 作业可以尝试设置作业参数 "table.exec.state.ttl" 观察下效果 另外开启 "state.backend.incremental" 也可以减少 checkpoint 的大小 参数说明: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#checkpoints-and-state-backends lincoln lee chenchencc <[hidden email]> 于2021年6月8日周二 下午3:11写道: > 版本:1.12.2 > sql: > SELECT id, name, message,ts > SELECT > ROW_NUMBER() OVER (PARTITION BY name > ORDER BY ts DESC) AS rowNum > FROM persons_message_table_kafka > WHERE rowNum = 1 > 过期时间设置:tableEnv.getConfig().setIdleStateRetention(Duration.ofhour(3)); > > 问题:checkpoint数据一直在线上增加,一开始90m,然后每天增长20m,但是源数据并没有太多增长 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
In reply to this post by chenchencc
建议关闭state.backend.incremental ,因为我遇到过,开启增量ckp导致ckp一直增大,关掉就正常了
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
checkpoint文件大小不断增加的原因是由于任务的状态不断累积导致的;所以如果任务状态很大的情况下,比如Group by 的字段过多等等,可以考虑开启增量state.backend.incremental,同时可以考虑任务的类型,如果任务是按天进行聚合指标的情况可以考虑设置状态过期清理时间idlestate.retention.time为一天等方式来防止chekcpoint保留状态数据的不断增加,或者增加速度过快导致任务的内存不够而被Kill掉; 但是看您的描述,并不是设置State TTL不生效,而是要考虑状态时间戳的更新方式,因为状态时间戳被更新存在两种模式StateTtlConfig.UpdateType.OnCreateAndWrite - 只在创建和写的时候更新(默认),StateTtlConfig.UpdateType.OnReadAndWrite - 在读和写的时候更新,所以可以考虑您的任务情况采用哪种设定状态的更新模式; 同时过期数据的清理策略和您设定的checkpoint保留是增量、全量或者增量RocksDB保留的策略都有关了,您可以综合考虑自己的checkpoint保留策略和任务类型合理设定状态过期清理时间idlestate.retention.time 和状态时间戳的更新方式以及任务的checkpoint的保留策略 Best , JasonLee | | 李闯 | | [hidden email] | 签名由网易邮箱大师定制 在2021年06月10日 12:33,HunterXHunter<[hidden email]> 写道: 建议关闭state.backend.incremental ,因为我遇到过,开启增量ckp导致ckp一直增大,关掉就正常了 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
This post was updated on .
嗯嗯,我这边排查看到是我是用temporary left join
维度表,使用事件时间,但是我期望维度表只保留3小时。目前使用on加上时间范围,貌似不生效,导致join的状态不断增加。有什么方式能处理吗,保留维度表一段时间数据。 嗯嗯,我这边排查看到是我是用temporary left join 维度表,使用事件时间,但是我期望维度表只保留3小时。目前使用on加上时间范围,貌似不生效,导致join的状态不断增加。有什么方式能处理吗,保留维度表一段时间数据。 SELECT id, name, message,ts SELECT ROW_NUMBER() OVER (PARTITION BY name ORDER BY ts DESC) AS rowNum FROM persons_message_table_kafka WHERE rowNum = 1 SELECT id, name, message,ts SELECT ROW_NUMBER() OVER (PARTITION BY name ORDER BY ts DESC) AS rowNum FROM persons_message_table_kafka WHERE rowNum = 1 SELECT id, name, message,ts SELECT ROW_NUMBER() OVER (PARTITION BY name ORDER BY ts DESC) AS rowNum FROM persons_message_table_kafka WHERE rowNum = 1 tableEnv.executeSql("create TEMPORARY view " + " dc_session_action_domain_config_flink_view_prod as " + "select t1.sess_id,t2.domain,t2.site,t1.add_time,t1.ts,t1.proctime\n" + "from test.action t1\n" + " left join test.session FOR SYSTEM_TIME AS OF t1.ts AS t2\n" + " on t1.sess_id = t2.sess_id " + " and (t1.ts between t2.ts and t2.ts + INTERVAL '3' HOUR) \n" ); 这个dc_session_action_domain_config_flink_view_prod join算子状态不断增加,但是本身源数据并没有增加。 Sent from: http://apache-flink.147419.n8.nabble.com/ -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
保留一段时间,就不是temporal join的语义了,你可以试试用interval join再做个去重,但是去重也有state的开销。
On Fri, 11 Jun 2021 at 10:44, chenchencc <[hidden email]> wrote: > 嗯嗯,我这边排查看到是我是用temporary left join > > 维度表,使用事件时间,但是我期望维度表只保留3小时。目前使用on加上时间范围,貌似不生效,导致join的状态不断增加。有什么方式能处理吗,保留维度表一段时间数据。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |