Flink sql 状态清理问题

classic Classic list List threaded Threaded
2 messages Options
op
Reply | Threaded
Open this post in threaded view
|

Flink sql 状态清理问题

op
hi,
写了个测试程序:

......
val tConfig = bstEnv.getConfig
confg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))
......
val q1=bstEnv.sqlQuery(
"""select createTime,feedid from source
|where circleName is not null
|and circleName not in('','_')
|and action = 'C_FEED_EDIT_SEND'
|""".stripMargin)

bstEnv.createTemporaryView("sourcefeed",q1)
val q2=bstEnv.sqlQuery(
"""select feedid,postfeedid,action from source
|where circleName is not null
|and circleName not in('','_')
|and action in('C_PUBLISH','C_FORWARD_PUBLISH')
|""".stripMargin)

bstEnv.createTemporaryView("postfeed",q2)
bstEnv.sqlQuery(
"""
|select count(b.postfeedid) from
|sourcefeed a
|join postfeed b
|on a.feedid=b.postfeedid
""".stripMargin).toRetractStream[Row](confg).print("")

//------------------------------------
程序里面设置了状态失效最晚时间是空闲25分钟,但是运行了几天了,我再web上观察到的状态一直再不断增加,可以确定关联的id最多只会活跃1个小时左右,请问是哪里没设置对还是join两边的state不支持清理?
Reply | Threaded
Open this post in threaded view
|

Re: Flink sql 状态清理问题

Benchao Li-2
Hi,

Join算子的state是支持清理的。
可以提供下以下信息:
- Flink 版本
- planner (blink planner / old planner)

op <[hidden email]> 于2020年6月10日周三 下午4:08写道:

> hi,
> 写了个测试程序:
>
> ......
>
> val tConfig = bstEnv.getConfig
>
> confg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))
>
> ......
>
> val q1=bstEnv.sqlQuery(
>   """select createTime,feedid from source
>     |where circleName is not null
>     |and circleName not in('','_')
>     |and action = 'C_FEED_EDIT_SEND'
>     |""".stripMargin)
>
>
>  bstEnv.createTemporaryView("sourcefeed",q1)
> val q2=bstEnv.sqlQuery(
>   """select feedid,postfeedid,action from source
>     |where circleName is not null
>     |and circleName not in('','_')
>     |and action in('C_PUBLISH','C_FORWARD_PUBLISH')
>     |""".stripMargin)
>
> bstEnv.createTemporaryView("postfeed",q2)
> bstEnv.sqlQuery(
>   """
>     |select count(b.postfeedid) from
>     |sourcefeed a
>     |join postfeed b
>     |on a.feedid=b.postfeedid
>   """.stripMargin).toRetractStream[Row](confg).print("")
>
>
> //------------------------------------
>
> 程序里面设置了状态失效最晚时间是空闲25分钟,但是运行了几天了,我再web上观察到的状态一直再不断增加,可以确定关联的id最多只会活跃1个小时左右,请问是哪里没设置对还是join两边的state不支持清理?
>
>