使用flinksql提供的内置函数LAST_VALUE 发现存的state越来越大

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

使用flinksql提供的内置函数LAST_VALUE 发现存的state越来越大

guoliang_wang1335
使用flink1.10。。开启了mini-batch和设置了idleStateRetentionTime,执行中会进行left join操作,最后sink的时候insert into table select a, LAST_VALUE(b) group by a;
有关配置如下:
val tConfig: TableConfig = tEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
val configuration = tEnv.getConfig().getConfiguration()
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")

configuration.setString("table.exec.mini-batch.size", "5000")


发现checkpointsize越来越大,设置的IdleStateRetentionTime对LAST_VALUE状态清理无效。

请问,有人遇到过这个问题吗?如何解决呢?