我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 |
Hi,
你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: > > > 我在flink sql中设置了 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); > sql: select * from test t join test2 t2 on t.a=t2.a > > 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: Hi, 你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: 我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: Hi, 你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: 我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。 我已经建了一个jira[1]来跟踪和改进这一点。 [1] https://issues.apache.org/jira/browse/FLINK-17199 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: > > > > > 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? > 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: > 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 > > > > > 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: > Hi, > > 你用的是哪个版本呢? > 在1.9版本里面的确是有点问题,默认没有开启cleanup in background > [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-15938 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: > > > > 我在flink sql中设置了 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); > sql: select * from test t join test2 t2 on t.a=t2.a > > > 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, 所以这个问题现在是不能完全避免了。 我已经建了一个jira[1]来跟踪和改进这一点。 [1] https://issues.apache.org/jira/browse/FLINK-17199 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: Hi, 你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: 我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
这是两个问题,
- 状态只访问一次,可能不会清理。 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 - 状态已经过期了,但是会被使用到。 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 [1] https://issues.apache.org/jira/browse/FLINK-16581 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: > 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 > > > > > 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: > 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, > 所以这个问题现在是不能完全避免了。 > 我已经建了一个jira[1]来跟踪和改进这一点。 > > [1] https://issues.apache.org/jira/browse/FLINK-17199 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: > > > > > > 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? > 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: > 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 > > > > > 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: > Hi, > > 你用的是哪个版本呢? > 在1.9版本里面的确是有点问题,默认没有开启cleanup in background > [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-15938 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: > > > > 我在flink sql中设置了 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); > sql: select * from test t join test2 t2 on t.a=t2.a > > > > 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: 这是两个问题, - 状态只访问一次,可能不会清理。 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 - 状态已经过期了,但是会被使用到。 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 [1] https://issues.apache.org/jira/browse/FLINK-16581 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, 所以这个问题现在是不能完全避免了。 我已经建了一个jira[1]来跟踪和改进这一点。 [1] https://issues.apache.org/jira/browse/FLINK-17199 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: Hi, 你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: 我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: static StateTtlConfig createTtlConfig(long retentionTime, boolean stateCleaningEnabled) { if (stateCleaningEnabled) { checkArgument(retentionTime > 0); return StateTtlConfig .newBuilder(Time.milliseconds(retentionTime)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .cleanupInBackground() // added this line .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // changed this line .build(); } else { return StateTtlConfig.DISABLED; } } 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:47写道: > 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 > > > > > 在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: > 这是两个问题, > > - 状态只访问一次,可能不会清理。 > > > 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 > - 状态已经过期了,但是会被使用到。 > 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 > > [1] https://issues.apache.org/jira/browse/FLINK-16581 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: > > 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 > > > > > 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: > 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, > 所以这个问题现在是不能完全避免了。 > 我已经建了一个jira[1]来跟踪和改进这一点。 > > [1] https://issues.apache.org/jira/browse/FLINK-17199 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: > > > > > > 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? > 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: > 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 > > > > > 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: > Hi, > > 你用的是哪个版本呢? > 在1.9版本里面的确是有点问题,默认没有开启cleanup in background > [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-15938 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: > > > > 我在flink sql中设置了 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); > sql: select * from test t join test2 t2 on t.a=t2.a > > > > > 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
而且https://issues.apache.org/jira/browse/FLINK-15938 和
https://issues.apache.org/jira/browse/FLINK-16581 这两个issue现在已经都merge了,你也可以cherry-pick过去。 Benchao Li <[hidden email]> 于2020年4月17日周五 下午2:54写道: > 嗯,blink planner跟legacy planner是有一些实现上的差异。 > 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: > > static StateTtlConfig createTtlConfig(long retentionTime, boolean stateCleaningEnabled) { > if (stateCleaningEnabled) { > checkArgument(retentionTime > 0); > return StateTtlConfig > .newBuilder(Time.milliseconds(retentionTime)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .cleanupInBackground() // added this line > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // changed this line > .build(); > } else { > return StateTtlConfig.DISABLED; > } > } > > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:47写道: > >> 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 >> >> >> >> >> 在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: >> 这是两个问题, >> >> - 状态只访问一次,可能不会清理。 >> >> >> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 >> - 状态已经过期了,但是会被使用到。 >> 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 >> >> [1] https://issues.apache.org/jira/browse/FLINK-16581 >> >> 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: >> >> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 >> >> >> >> >> 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: >> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, >> 所以这个问题现在是不能完全避免了。 >> 我已经建了一个jira[1]来跟踪和改进这一点。 >> >> [1] https://issues.apache.org/jira/browse/FLINK-17199 >> >> 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: >> >> >> >> >> >> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? >> 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: >> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 >> >> >> >> >> 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: >> Hi, >> >> 你用的是哪个版本呢? >> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background >> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 >> >> [1] https://issues.apache.org/jira/browse/FLINK-15938 >> >> 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: >> >> >> >> 我在flink sql中设置了 >> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); >> sql: select * from test t join test2 t2 on t.a=t2.a >> >> >> >> >> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 >> >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: [hidden email]; [hidden email] >> >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: [hidden email]; [hidden email] >> >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: [hidden email]; [hidden email] >> > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
In reply to this post by Benchao Li
我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));这种方式设置ttl
在2020年4月17日 14:54,Benchao Li<[hidden email]> 写道: 嗯,blink planner跟legacy planner是有一些实现上的差异。 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: static StateTtlConfig createTtlConfig(long retentionTime, boolean stateCleaningEnabled) { if (stateCleaningEnabled) { checkArgument(retentionTime > 0); return StateTtlConfig .newBuilder(Time.milliseconds(retentionTime)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .cleanupInBackground() // added this line .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // changed this line .build(); } else { return StateTtlConfig.DISABLED; } } 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:47写道: 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: 这是两个问题, - 状态只访问一次,可能不会清理。 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 - 状态已经过期了,但是会被使用到。 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 [1] https://issues.apache.org/jira/browse/FLINK-16581 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, 所以这个问题现在是不能完全避免了。 我已经建了一个jira[1]来跟踪和改进这一点。 [1] https://issues.apache.org/jira/browse/FLINK-17199 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: Hi, 你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: 我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。
酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午3:09写道: > 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), > Time.minutes(6));这种方式设置ttl > > > > > 在2020年4月17日 14:54,Benchao Li<[hidden email]> 写道: > 嗯,blink planner跟legacy planner是有一些实现上的差异。 > 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: > > static StateTtlConfig createTtlConfig(long retentionTime, boolean > stateCleaningEnabled) { > if (stateCleaningEnabled) { > checkArgument(retentionTime > 0); > return StateTtlConfig > .newBuilder(Time.milliseconds(retentionTime)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .cleanupInBackground() // added this line > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > // changed this line > .build(); > } else { > return StateTtlConfig.DISABLED; > } > } > > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:47写道: > > 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 > > > > > 在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: > 这是两个问题, > > - 状态只访问一次,可能不会清理。 > > > > 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 > - 状态已经过期了,但是会被使用到。 > 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 > > [1] https://issues.apache.org/jira/browse/FLINK-16581 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: > > 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 > > > > > 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: > 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, > 所以这个问题现在是不能完全避免了。 > 我已经建了一个jira[1]来跟踪和改进这一点。 > > [1] https://issues.apache.org/jira/browse/FLINK-17199 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: > > > > > > 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? > 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: > 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 > > > > > 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: > Hi, > > 你用的是哪个版本呢? > 在1.9版本里面的确是有点问题,默认没有开启cleanup in background > [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-15938 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: > > > > 我在flink sql中设置了 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); > sql: select * from test t join test2 t2 on t.a=t2.a > > > > > > 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
好的,非常感谢您,我去按照您说的代码改下,非常感谢
在2020年4月17日 15:17,Benchao Li<[hidden email]> 写道: 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午3:09写道: 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));这种方式设置ttl 在2020年4月17日 14:54,Benchao Li<[hidden email]> 写道: 嗯,blink planner跟legacy planner是有一些实现上的差异。 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: static StateTtlConfig createTtlConfig(long retentionTime, boolean stateCleaningEnabled) { if (stateCleaningEnabled) { checkArgument(retentionTime > 0); return StateTtlConfig .newBuilder(Time.milliseconds(retentionTime)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .cleanupInBackground() // added this line .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // changed this line .build(); } else { return StateTtlConfig.DISABLED; } } 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:47写道: 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: 这是两个问题, - 状态只访问一次,可能不会清理。 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 - 状态已经过期了,但是会被使用到。 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 [1] https://issues.apache.org/jira/browse/FLINK-16581 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, 所以这个问题现在是不能完全避免了。 我已经建了一个jira[1]来跟踪和改进这一点。 [1] https://issues.apache.org/jira/browse/FLINK-17199 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: Hi, 你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: 我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:221) at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:205) at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) ... 10 more 在2020年4月17日 15:27,酷酷的浑蛋<[hidden email]> 写道: 好的,非常感谢您,我去按照您说的代码改下,非常感谢 在2020年4月17日 15:17,Benchao Li<[hidden email]> 写道: 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午3:09写道: 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));这种方式设置ttl 在2020年4月17日 14:54,Benchao Li<[hidden email]> 写道: 嗯,blink planner跟legacy planner是有一些实现上的差异。 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: static StateTtlConfig createTtlConfig(long retentionTime, boolean stateCleaningEnabled) { if (stateCleaningEnabled) { checkArgument(retentionTime > 0); return StateTtlConfig .newBuilder(Time.milliseconds(retentionTime)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .cleanupInBackground() // added this line .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // changed this line .build(); } else { return StateTtlConfig.DISABLED; } } 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:47写道: 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: 这是两个问题, - 状态只访问一次,可能不会清理。 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 - 状态已经过期了,但是会被使用到。 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 [1] https://issues.apache.org/jira/browse/FLINK-16581 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, 所以这个问题现在是不能完全避免了。 我已经建了一个jira[1]来跟踪和改进这一点。 [1] https://issues.apache.org/jira/browse/FLINK-17199 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: Hi, 你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: 我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
In reply to this post by 酷酷的浑蛋
hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:221) at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:205) at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) ... 10 more 在2020年4月17日 15:27,酷酷的浑蛋<[hidden email]> 写道: 好的,非常感谢您,我去按照您说的代码改下,非常感谢 在2020年4月17日 15:17,Benchao Li<[hidden email]> 写道: 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午3:09写道: 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));这种方式设置ttl 在2020年4月17日 14:54,Benchao Li<[hidden email]> 写道: 嗯,blink planner跟legacy planner是有一些实现上的差异。 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: static StateTtlConfig createTtlConfig(long retentionTime, boolean stateCleaningEnabled) { if (stateCleaningEnabled) { checkArgument(retentionTime > 0); return StateTtlConfig .newBuilder(Time.milliseconds(retentionTime)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .cleanupInBackground() // added this line .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // changed this line .build(); } else { return StateTtlConfig.DISABLED; } } 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:47写道: 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: 这是两个问题, - 状态只访问一次,可能不会清理。 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 - 状态已经过期了,但是会被使用到。 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 [1] https://issues.apache.org/jira/browse/FLINK-16581 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, 所以这个问题现在是不能完全避免了。 我已经建了一个jira[1]来跟踪和改进这一点。 [1] https://issues.apache.org/jira/browse/FLINK-17199 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: Hi, 你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: 我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
In reply to this post by 酷酷的浑蛋
我对checkpoint不是很熟悉。 不过这个错误看起来是因为修改了TtlConfig导致serializer不兼容导致的,可能不是很好解决。
酷酷的浑蛋 <[hidden email]> 于2020年4月21日周二 下午10:37写道: > hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错: > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) > at > org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:221) > at > org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:205) > at > org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85) > at > org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > ... 10 more > > > > > 在2020年4月17日 15:27,酷酷的浑蛋<[hidden email]> 写道: > 好的,非常感谢您,我去按照您说的代码改下,非常感谢 > > > > > 在2020年4月17日 15:17,Benchao Li<[hidden email]> 写道: > 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午3:09写道: > > 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), > Time.minutes(6));这种方式设置ttl > > > > > 在2020年4月17日 14:54,Benchao Li<[hidden email]> 写道: > 嗯,blink planner跟legacy planner是有一些实现上的差异。 > 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: > > static StateTtlConfig createTtlConfig(long retentionTime, boolean > stateCleaningEnabled) { > if (stateCleaningEnabled) { > checkArgument(retentionTime > 0); > return StateTtlConfig > .newBuilder(Time.milliseconds(retentionTime)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .cleanupInBackground() // added this line > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > // changed this line > .build(); > } else { > return StateTtlConfig.DISABLED; > } > } > > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:47写道: > > 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 > > > > > 在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: > 这是两个问题, > > - 状态只访问一次,可能不会清理。 > > > > > 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 > - 状态已经过期了,但是会被使用到。 > 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 > > [1] https://issues.apache.org/jira/browse/FLINK-16581 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: > > 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 > > > > > 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: > 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, > 所以这个问题现在是不能完全避免了。 > 我已经建了一个jira[1]来跟踪和改进这一点。 > > [1] https://issues.apache.org/jira/browse/FLINK-17199 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: > > > > > > 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? > 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: > 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 > > > > > 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: > Hi, > > 你用的是哪个版本呢? > 在1.9版本里面的确是有点问题,默认没有开启cleanup in background > [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-15938 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: > > > > 我在flink sql中设置了 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); > sql: select * from test t join test2 t2 on t.a=t2.a > > > > > > > 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
社区版的 Planner 针对 Key 状态的清理,使用的 Timer 来进行清理。
1.9.1 Blink planner 最底层状态清理 还是使用的 StateTTLConfig 来进行清理(不是 Background),所以存在部分状态后面没读, 状态没有清理的情况 Benchao Li <[hidden email]> 于2020年4月21日周二 下午11:15写道: > 我对checkpoint不是很熟悉。 不过这个错误看起来是因为修改了TtlConfig导致serializer不兼容导致的,可能不是很好解决。 > > 酷酷的浑蛋 <[hidden email]> 于2020年4月21日周二 下午10:37写道: > > > hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错: > > java.lang.RuntimeException: Error while getting state > > at > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > > at > > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) > > at > > > org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:221) > > at > > > org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:205) > > at > > > org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85) > > at > > > org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.util.StateMigrationException: The new state > > serializer cannot be incompatible. > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > > at > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > > at > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > > at > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > > at > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > > ... 10 more > > > > > > > > > > 在2020年4月17日 15:27,酷酷的浑蛋<[hidden email]> 写道: > > 好的,非常感谢您,我去按照您说的代码改下,非常感谢 > > > > > > > > > > 在2020年4月17日 15:17,Benchao Li<[hidden email]> 写道: > > 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。 > > > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午3:09写道: > > > > 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 > > tableConfig.setIdleStateRetentionTime(Time.minutes(1), > > Time.minutes(6));这种方式设置ttl > > > > > > > > > > 在2020年4月17日 14:54,Benchao Li<[hidden email]> 写道: > > 嗯,blink planner跟legacy planner是有一些实现上的差异。 > > 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中: > > > > static StateTtlConfig createTtlConfig(long retentionTime, boolean > > stateCleaningEnabled) { > > if (stateCleaningEnabled) { > > checkArgument(retentionTime > 0); > > return StateTtlConfig > > .newBuilder(Time.milliseconds(retentionTime)) > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .cleanupInBackground() // added this line > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > // changed this line > > .build(); > > } else { > > return StateTtlConfig.DISABLED; > > } > > } > > > > > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:47写道: > > > > 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 > > > > > > > > > > 在2020年4月17日 14:16,Benchao Li<[hidden email]> 写道: > > 这是两个问题, > > > > - 状态只访问一次,可能不会清理。 > > > > > > > > > > > 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。 > > - 状态已经过期了,但是会被使用到。 > > 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。 > > > > [1] https://issues.apache.org/jira/browse/FLINK-16581 > > > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午2:06写道: > > > > 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 > > > > > > > > > > 在2020年4月17日 13:07,Benchao Li<[hidden email]> 写道: > > 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, > > 所以这个问题现在是不能完全避免了。 > > 我已经建了一个jira[1]来跟踪和改进这一点。 > > > > [1] https://issues.apache.org/jira/browse/FLINK-17199 > > > > 酷酷的浑蛋 <[hidden email]> 于2020年4月17日周五 下午12:51写道: > > > > > > > > > > > > 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢? > > 在2020年4月16日 15:28,酷酷的浑蛋<[hidden email]> 写道: > > 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 > > > > > > > > > > 在2020年4月15日 18:04,Benchao Li<[hidden email]> 写道: > > Hi, > > > > 你用的是哪个版本呢? > > 在1.9版本里面的确是有点问题,默认没有开启cleanup in background > > [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 > > > > [1] https://issues.apache.org/jira/browse/FLINK-15938 > > > > 酷酷的浑蛋 <[hidden email]> 于2020年4月15日周三 下午5:40写道: > > > > > > > > 我在flink sql中设置了 > > tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); > > sql: select * from test t join test2 t2 on t.a=t2.a > > > > > > > > > > > > > > > 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 > > > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > > > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > > > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > > > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > > > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > |
Free forum by Nabble | Edit this page |