大家好,最近发现一个问题
发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗? |
Hi,
可以稳定复现吗?麻烦贴下flink版本和你的case, 我可以帮忙跟进确认下 Best, Leonard Xu > 在 2020年6月11日,14:30,op <[hidden email]> 写道: > > 大家好,最近发现一个问题 > 发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗? |
一开始用的Blinkplanner,试了几天状态都清理不掉,改成oldplanner后就可以了,版本是1.10
package test.table.sql import java.util.Properties import com.souhu.msns.huyou.PublicParams import com.souhu.msns.huyou.utils.KafkaPbSchema import org.apache.flink.api.common.time.Time import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.streaming.api.windowing.time.{Time => WindowTime} import org.apache.flink.types.Row object test { def main(args: Array[String]): Unit = { //----------------------------配置执行环境------------------------------------------------ val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment bsEnv.setNumberOfExecutionRetries(1) bsEnv.setParallelism(1) //bsEnv.getConfig.setAutoWatermarkInterval(10000) bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) bsEnv.setStateBackend(new FsStateBackend("hdfs://dc1:8020/user/msns/streaming/checkpoint/flink/Circ", true)) bsEnv.getCheckpointConfig.setCheckpointInterval(300000) bsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000) bsEnv.setParallelism(3) bsEnv.setNumberOfExecutionRetries(1) //----------------------------配置TABLE环境------------------------------------------------ val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bstEnv = StreamTableEnvironment.create(bsEnv,setting) val tConfig = bstEnv.getConfig tConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(20)) val config = bstEnv.getConfig.getConfiguration() config.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled config.setString("table.exec.mini-batch.allow-latency", "5 s") config.setString("table.exec.mini-batch.size", "5000") config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation config.setString("table.optimizer.distinct-agg.split.enabled", "true") //bstEnv.getConfig.setLocalTimeZone(ZoneId.of("Etc/GMT+8")) //----------------------------创建数据源和表------------------------------------------------ val kafkaProps = new Properties() kafkaProps.setProperty("bootstrap.servers", PublicParams.brokers) val source = .... .toTable(bstEnv,'userId,'createTime.rowtime,'action,'circleName,'flowName,'ts,'content,'feedid,'postfeedid,'sessionId) bstEnv.createTemporaryView("source",source) val q1=bstEnv.sqlQuery( """select sessionId from source |where sessionId is not null |and action='P_TIMELINE'""".stripMargin) q1.toAppendStream[Row].print("source") bstEnv.createTemporaryView("sourcefeed",q1) val q2=bstEnv.sqlQuery( """select sessionId from source |where sessionId is not null |and action='V_TIMELINE_FEED'""".stripMargin) bstEnv.createTemporaryView("postfeed",q2) bstEnv.sqlQuery( """ |select count(b.sessionId) from |sourcefeed a |join postfeed b |on a.sessionId=b.sessionId """.stripMargin).toRetractStream[Row].print("") bstEnv.execute("") } } ------------------ 原始邮件 ------------------ 发件人: "Leonard Xu"<[hidden email]>; 发送时间: 2020年6月11日(星期四) 下午2:40 收件人: "user-zh"<[hidden email]>; 主题: Re: BLinkPlanner sql join状态清理 Hi, 可以稳定复现吗?麻烦贴下flink版本和你的case, 我可以帮忙跟进确认下 Best, Leonard Xu > 在 2020年6月11日,14:30,op <[hidden email]> 写道: > > 大家好,最近发现一个问题 > 发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗? |
我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
<http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png> 不太清楚为什么用了mini batch就没读取这个配置。 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi Ericliuk,
这应该是实现的bug,你可以去社区建一个issue描述下这个问题。 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~ Ericliuk <[hidden email]> 于2020年9月29日周二 下午4:59写道: > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。 > < > http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png> > > > 不太清楚为什么用了mini batch就没读取这个配置。 > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li |
miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830
Benchao Li <[hidden email]> 于2020年9月29日周二 下午5:18写道: > Hi Ericliuk, > > 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。 > 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~ > > Ericliuk <[hidden email]> 于2020年9月29日周二 下午4:59写道: > > > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。 > > < > > > http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png > > > > > > > > 不太清楚为什么用了mini batch就没读取这个配置。 > > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么? > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > -- > > Best, > Benchao Li > |
Hi, MiniBatch Agg目前没有实现State TTl,我提了个PR修复这个问题,参考https://github.com/apache/flink/pull/11830
@Jark,辛苦有空时帮忙reveiw一下代码,这个问题越来越多用户用户遇到了。 > -----原始邮件----- > 发件人: "刘建刚" <[hidden email]> > 发送时间: 2020-09-29 18:27:47 (星期二) > 收件人: user-zh <[hidden email]> > 抄送: > 主题: Re: 回复: BLinkPlanner sql join状态清理 > > miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830 > > Benchao Li <[hidden email]> 于2020年9月29日周二 下午5:18写道: > > > Hi Ericliuk, > > > > 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。 > > 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~ > > > > Ericliuk <[hidden email]> 于2020年9月29日周二 下午4:59写道: > > > > > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。 > > > < > > > > > http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png > > > > > > > > > > > > 不太清楚为什么用了mini batch就没读取这个配置。 > > > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么? > > > > > > > > > > > > -- > > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > > > > -- > > > > Best, > > Benchao Li > > ------------------------------ Best |
Free forum by Nabble | Edit this page |