flink sql状态清理问题

classic Classic list List threaded Threaded
7 messages Options
op
Reply | Threaded
Open this post in threaded view
|

flink sql状态清理问题

op
Hi
    在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
  val config = tableConfig.getConfiguration()
    config.setString("table.exec.mini-batch.enabled", "true")
    config.setString("table.exec.mini-batch.allow-latency", "5s")
    config.setString("table.exec.mini-batch.size", "20")  无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;  同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
Reply | Threaded
Open this post in threaded view
|

Re: flink sql状态清理问题

Benchao Li-2
Hi,

最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。

op <[hidden email]> 于2020年8月10日周一 上午10:27写道:

> Hi
> &nbsp; &nbsp; 在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
> &nbsp; val config = tableConfig.getConfiguration()
> &nbsp; &nbsp;&nbsp;config.setString("table.exec.mini-batch.enabled",
> "true")
> &nbsp; &nbsp;&nbsp;config.setString("table.exec.mini-batch.allow-latency",
> "5s")
> &nbsp; &nbsp;&nbsp;config.setString("table.exec.mini-batch.size", "20")
> 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
> 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: flink sql状态清理问题

godfrey he
配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗?

Benchao Li <[hidden email]> 于2020年8月10日周一 上午10:36写道:

> Hi,
>
> 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
>
> op <[hidden email]> 于2020年8月10日周一 上午10:27写道:
>
> > Hi
> > &nbsp; &nbsp; 在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
> > &nbsp; val config = tableConfig.getConfiguration()
> > &nbsp; &nbsp;&nbsp;config.setString("table.exec.mini-batch.enabled",
> > "true")
> > &nbsp;
> &nbsp;&nbsp;config.setString("table.exec.mini-batch.allow-latency",
> > "5s")
> > &nbsp; &nbsp;&nbsp;config.setString("table.exec.mini-batch.size", "20")
> >
> 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
> > 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
>
>
>
> --
>
> Best,
> Benchao Li
>
op
Reply | Threaded
Open this post in threaded view
|

回复: flink sql状态清理问题

op
配置了minIdleStateRetentionTime ,
val tConfig = tableEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.minutes(5), Time.minutes(10))
使用的是1.11.0版本,sql就是一个简单测试,按照sessionid groupby count(*),一个sessionid一般1分钟内就会失效,
问题是同一套代码,目前观察到的是配置minibatch后影响状态清理了


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月10日(星期一) 上午10:44
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: flink sql状态清理问题



配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗?

Benchao Li <[hidden email]&gt; 于2020年8月10日周一 上午10:36写道:

&gt; Hi,
&gt;
&gt; 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
&gt;
&gt; op <[hidden email]&gt; 于2020年8月10日周一 上午10:27写道:
&gt;
&gt; &gt; Hi
&gt; &gt; &amp;nbsp; &amp;nbsp; 在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
&gt; &gt; &amp;nbsp; val config = tableConfig.getConfiguration()
&gt; &gt; &amp;nbsp; &amp;nbsp;&amp;nbsp;config.setString("table.exec.mini-batch.enabled",
&gt; &gt; "true")
&gt; &gt; &amp;nbsp;
&gt; &amp;nbsp;&amp;nbsp;config.setString("table.exec.mini-batch.allow-latency",
&gt; &gt; "5s")
&gt; &gt; &amp;nbsp; &amp;nbsp;&amp;nbsp;config.setString("table.exec.mini-batch.size", "20")
&gt; &gt;
&gt; 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
&gt; &gt; 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li
&gt;
Reply | Threaded
Open this post in threaded view
|

Re: 回复: flink sql状态清理问题

刘大龙
Hi,
我看你开了minibatch,你用了aggregate算子了吗?


> -----原始邮件-----
> 发件人: op <[hidden email]>
> 发送时间: 2020-08-10 10:50:08 (星期一)
> 收件人: user-zh <[hidden email]>
> 抄送:
> 主题: 回复: flink sql状态清理问题
>
> 配置了minIdleStateRetentionTime ,
> val tConfig = tableEnv.getConfig
> tConfig.setIdleStateRetentionTime(Time.minutes(5), Time.minutes(10))
> 使用的是1.11.0版本,sql就是一个简单测试,按照sessionid groupby count(*),一个sessionid一般1分钟内就会失效,
> 问题是同一套代码,目前观察到的是配置minibatch后影响状态清理了
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
> 发送时间:&nbsp;2020年8月10日(星期一) 上午10:44
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: flink sql状态清理问题
>
>
>
> 配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗?
>
> Benchao Li <[hidden email]&gt; 于2020年8月10日周一 上午10:36写道:
>
> &gt; Hi,
> &gt;
> &gt; 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
> &gt;
> &gt; op <[hidden email]&gt; 于2020年8月10日周一 上午10:27写道:
> &gt;
> &gt; &gt; Hi
> &gt; &gt; &amp;nbsp; &amp;nbsp; 在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
> &gt; &gt; &amp;nbsp; val config = tableConfig.getConfiguration()
> &gt; &gt; &amp;nbsp; &amp;nbsp;&amp;nbsp;config.setString("table.exec.mini-batch.enabled",
> &gt; &gt; "true")
> &gt; &gt; &amp;nbsp;
> &gt; &amp;nbsp;&amp;nbsp;config.setString("table.exec.mini-batch.allow-latency",
> &gt; &gt; "5s")
> &gt; &gt; &amp;nbsp; &amp;nbsp;&amp;nbsp;config.setString("table.exec.mini-batch.size", "20")
> &gt; &gt;
> &gt; 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
> &gt; &gt; 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
> &gt;
> &gt;
> &gt;
> &gt; --
> &gt;
> &gt; Best,
> &gt; Benchao Li
> &gt;


------------------------------
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281
op
Reply | Threaded
Open this post in threaded view
|

回复: 回复: flink sql状态清理问题

op
hi
grouby count(*)不是吗




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月10日(星期一) 下午2:13
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 回复: flink sql状态清理问题



Hi,
我看你开了minibatch,你用了aggregate算子了吗?


&gt; -----原始邮件-----
&gt; 发件人: op <[hidden email]&gt;
&gt; 发送时间: 2020-08-10 10:50:08 (星期一)
&gt; 收件人: user-zh <[hidden email]&gt;
&gt; 抄送:
&gt; 主题: 回复: flink sql状态清理问题
&gt;
&gt; 配置了minIdleStateRetentionTime ,
&gt; val tConfig = tableEnv.getConfig
&gt; tConfig.setIdleStateRetentionTime(Time.minutes(5), Time.minutes(10))
&gt; 使用的是1.11.0版本,sql就是一个简单测试,按照sessionid groupby count(*),一个sessionid一般1分钟内就会失效,
&gt; 问题是同一套代码,目前观察到的是配置minibatch后影响状态清理了
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年8月10日(星期一) 上午10:44
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: flink sql状态清理问题
&gt;
&gt;
&gt;
&gt; 配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗?
&gt;
&gt; Benchao Li <[hidden email]&amp;gt; 于2020年8月10日周一 上午10:36写道:
&gt;
&gt; &amp;gt; Hi,
&gt; &amp;gt;
&gt; &amp;gt; 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
&gt; &amp;gt;
&gt; &amp;gt; op <[hidden email]&amp;gt; 于2020年8月10日周一 上午10:27写道:
&gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Hi
&gt; &amp;gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
&gt; &amp;gt; &amp;gt; &amp;amp;nbsp; val config = tableConfig.getConfiguration()
&gt; &amp;gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;&amp;amp;nbsp;config.setString("table.exec.mini-batch.enabled",
&gt; &amp;gt; &amp;gt; "true")
&gt; &amp;gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;config.setString("table.exec.mini-batch.allow-latency",
&gt; &amp;gt; &amp;gt; "5s")
&gt; &amp;gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;&amp;amp;nbsp;config.setString("table.exec.mini-batch.size", "20")
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
&gt; &amp;gt; &amp;gt; 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; --
&gt; &amp;gt;
&gt; &amp;gt; Best,
&gt; &amp;gt; Benchao Li
&gt; &amp;gt;


------------------------------
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281
Reply | Threaded
Open this post in threaded view
|

Re: 回复: flink sql状态清理问题

Benchao Li-2
Hi,

我看了一下mini-batch的聚合函数的实现,的确是没有开启状态清理,我建了一个issue[1] 来跟进修复这个bug。

[1] https://issues.apache.org/jira/browse/FLINK-18872

op <[hidden email]> 于2020年8月10日周一 下午4:49写道:

> hi
> grouby count(*)不是吗
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> [hidden email]&gt;;
> 发送时间:&nbsp;2020年8月10日(星期一) 下午2:13
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 回复: flink sql状态清理问题
>
>
>
> Hi,
> 我看你开了minibatch,你用了aggregate算子了吗?
>
>
> &gt; -----原始邮件-----
> &gt; 发件人: op <[hidden email]&gt;
> &gt; 发送时间: 2020-08-10 10:50:08 (星期一)
> &gt; 收件人: user-zh <[hidden email]&gt;
> &gt; 抄送:
> &gt; 主题: 回复: flink sql状态清理问题
> &gt;
> &gt; 配置了minIdleStateRetentionTime ,
> &gt; val tConfig = tableEnv.getConfig
> &gt; tConfig.setIdleStateRetentionTime(Time.minutes(5), Time.minutes(10))
> &gt; 使用的是1.11.0版本,sql就是一个简单测试,按照sessionid groupby
> count(*),一个sessionid一般1分钟内就会失效,
> &gt; 问题是同一套代码,目前观察到的是配置minibatch后影响状态清理了
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt;
> 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> <[hidden email]&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年8月10日(星期一) 上午10:44
> &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: flink sql状态清理问题
> &gt;
> &gt;
> &gt;
> &gt; 配置了 TableConfig 中的 minIdleStateRetentionTime 和
> maxIdleStateRetentionTime 吗?
> &gt;
> &gt; Benchao Li <[hidden email]&amp;gt; 于2020年8月10日周一 上午10:36写道:
> &gt;
> &gt; &amp;gt; Hi,
> &gt; &amp;gt;
> &gt; &amp;gt; 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
> &gt; &amp;gt;
> &gt; &amp;gt; op <[hidden email]&amp;gt; 于2020年8月10日周一 上午10:27写道:
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;gt; Hi
> &gt; &amp;gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 在使用flink
> sql的过程中遇到如下情况,在配置了如下选项后:
> &gt; &amp;gt; &amp;gt; &amp;amp;nbsp; val config =
> tableConfig.getConfiguration()
> &gt; &amp;gt; &amp;gt; &amp;amp;nbsp;
> &amp;amp;nbsp;&amp;amp;nbsp;config.setString("table.exec.mini-batch.enabled",
> &gt; &amp;gt; &amp;gt; "true")
> &gt; &amp;gt; &amp;gt; &amp;amp;nbsp;
> &gt; &amp;gt;
> &amp;amp;nbsp;&amp;amp;nbsp;config.setString("table.exec.mini-batch.allow-latency",
> &gt; &amp;gt; &amp;gt; "5s")
> &gt; &amp;gt; &amp;gt; &amp;amp;nbsp;
> &amp;amp;nbsp;&amp;amp;nbsp;config.setString("table.exec.mini-batch.size",
> "20")
> &gt; &amp;gt; &amp;gt;
> &gt; &amp;gt;
> 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
> &gt; &amp;gt; &amp;gt;
> 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; --
> &gt; &amp;gt;
> &gt; &amp;gt; Best,
> &gt; &amp;gt; Benchao Li
> &gt; &amp;gt;
>
>
> ------------------------------
> 刘大龙
>
> 浙江大学 控制系 智能系统与控制研究所 工控新楼217
> 地址:浙江省杭州市浙大路38号浙江大学玉泉校区
> Tel:18867547281



--

Best,
Benchao Li