|
使用flink1.10。。开启了mini-batch和设置了idleStateRetentionTime,执行中会进行left join操作,最后sink的时候insert into table select a, LAST_VALUE(b) group by a;
有关配置如下:
val tConfig: TableConfig = tEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
val configuration = tEnv.getConfig().getConfiguration()
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
发现checkpointsize越来越大,设置的IdleStateRetentionTime对LAST_VALUE状态清理无效。
请问,有人遇到过这个问题吗?如何解决呢?
|