blinkSQL架构会自动清理过期的state吗

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

blinkSQL架构会自动清理过期的state吗

守护
社区各位大佬:
请教一个问题,flink1.9中使用blink SQL语句,设置时间窗口,state存储方式选择FSStateBackend,现在发现State一直在增大,过了窗口后也没有删除过期state,是blink架构就不支持窗口自动清理state吗,还是我哪使用的不对,我测试1.9的flinkSQL是不会有这个问题的。
Reply | Threaded
Open this post in threaded view
|

Re: blinkSQL架构会自动清理过期的state吗

Jark
Administrator
Hi,
能提供下 SQL 么?

blink sql 的 window 理论上是会自动清理的。

> 在 2019年9月11日,18:56,守护 <[hidden email]> 写道:
>
> 社区各位大佬:
> 请教一个问题,flink1.9中使用blink SQL语句,设置时间窗口,state存储方式选择FSStateBackend,现在发现State一直在增大,过了窗口后也没有删除过期state,是blink架构就不支持窗口自动清理state吗,还是我哪使用的不对,我测试1.9的flinkSQL是不会有这个问题的。

Reply | Threaded
Open this post in threaded view
|

回复: blinkSQL架构会自动清理过期的state吗

守护
下面是我要执行的代码,麻烦帮看一下:


&nbsp; &nbsp; tableEnv.registerDataStream("testCountTable", waterMarkStream, 'curuserid,'timelong,'rowtime.rowtime)
&nbsp; &nbsp;&nbsp;
&nbsp; &nbsp; val result = tableEnv.sqlQuery(s"SELECT COUNT(0) as pv,COUNT(distinct curuserid)" +
&nbsp; &nbsp; &nbsp; s" as uv,TUMBLE_END(rowtime, INTERVAL '10' MINUTE) FROM testCountTable GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)")


&nbsp; &nbsp; val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](result)


&nbsp; &nbsp; val data=dsRow.map(w =&gt; {
&nbsp; &nbsp; &nbsp; val StrArrary = w.toString.split(",")
&nbsp; &nbsp; &nbsp; val str="{\"pv\":"+"\""+StrArrary(0)+"\""+",\"uv\":"+"\""+StrArrary(1)+"\""+",\"rowtime\":"+"\""+StrArrary(2)+"\""+"}"
&nbsp; &nbsp; &nbsp; str
&nbsp; &nbsp; })
&nbsp; &nbsp; data.print()





------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Jark Wu"<[hidden email]&gt;;
发送时间:&nbsp;2019年9月12日(星期四) 中午12:51
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: blinkSQL架构会自动清理过期的state吗



Hi,
能提供下 SQL 么?

blink sql 的 window 理论上是会自动清理的。

&gt; 在 2019年9月11日,18:56,守护 <[hidden email]&gt; 写道:
&gt;
&gt; 社区各位大佬:
&gt; 请教一个问题,flink1.9中使用blink SQL语句,设置时间窗口,state存储方式选择FSStateBackend,现在发现State一直在增大,过了窗口后也没有删除过期state,是blink架构就不支持窗口自动清理state吗,还是我哪使用的不对,我测试1.9的flinkSQL是不会有这个问题的。
Reply | Threaded
Open this post in threaded view
|

Re: blinkSQL架构会自动清理过期的state吗

LakeShen
Hi 守护,
*    可*以先确保你的watermark是否一直在更新。
    然后可以把窗口时间间隔设置小一点,然后在运行程序,看看checkpoint 的状态文件是否会自动清理。



守护 <[hidden email]> 于2019年9月12日周四 下午2:35写道:

> 下面是我要执行的代码,麻烦帮看一下:
>
>
> &nbsp; &nbsp; tableEnv.registerDataStream("testCountTable",
> waterMarkStream, 'curuserid,'timelong,'rowtime.rowtime)
> &nbsp; &nbsp;&nbsp;
> &nbsp; &nbsp; val result = tableEnv.sqlQuery(s"SELECT COUNT(0) as
> pv,COUNT(distinct curuserid)" +
> &nbsp; &nbsp; &nbsp; s" as uv,TUMBLE_END(rowtime, INTERVAL '10' MINUTE)
> FROM testCountTable GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)")
>
>
> &nbsp; &nbsp; val dsRow: DataStream[Row] =
> tableEnv.toAppendStream[Row](result)
>
>
> &nbsp; &nbsp; val data=dsRow.map(w =&gt; {
> &nbsp; &nbsp; &nbsp; val StrArrary = w.toString.split(",")
> &nbsp; &nbsp; &nbsp; val
> str="{\"pv\":"+"\""+StrArrary(0)+"\""+",\"uv\":"+"\""+StrArrary(1)+"\""+",\"rowtime\":"+"\""+StrArrary(2)+"\""+"}"
> &nbsp; &nbsp; &nbsp; str
> &nbsp; &nbsp; })
> &nbsp; &nbsp; data.print()
>
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;;
> 发送时间:&nbsp;2019年9月12日(星期四) 中午12:51
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: blinkSQL架构会自动清理过期的state吗
>
>
>
> Hi,
> 能提供下 SQL 么?
>
> blink sql 的 window 理论上是会自动清理的。
>
> &gt; 在 2019年9月11日,18:56,守护 <[hidden email]&gt; 写道:
> &gt;
> &gt; 社区各位大佬:
> &gt; 请教一个问题,flink1.9中使用blink
> SQL语句,设置时间窗口,state存储方式选择FSStateBackend,现在发现State一直在增大,过了窗口后也没有删除过期state,是blink架构就不支持窗口自动清理state吗,还是我哪使用的不对,我测试1.9的flinkSQL是不会有这个问题的。
Reply | Threaded
Open this post in threaded view
|

回复: blinkSQL架构会自动清理过期的state吗

守护
&nbsp;感谢你的回复
&nbsp; &nbsp; &nbsp;blinkSQL目前测试了很多次,watermark和窗口间隔都调整了,checkpoint的状态还是不会自动动清理,在flinkSQL中就不存在这种问题,所以问一下大家在使用blinkSQL中有没有发现这个问题呢






------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"LakeShen"<[hidden email]&gt;;
发送时间:&nbsp;2019年9月12日(星期四) 下午4:18
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: blinkSQL架构会自动清理过期的state吗



Hi 守护,
*&nbsp;&nbsp;&nbsp; 可*以先确保你的watermark是否一直在更新。
&nbsp;&nbsp;&nbsp; 然后可以把窗口时间间隔设置小一点,然后在运行程序,看看checkpoint 的状态文件是否会自动清理。



守护 <[hidden email]&gt; 于2019年9月12日周四 下午2:35写道:

&gt; 下面是我要执行的代码,麻烦帮看一下:
&gt;
&gt;
&gt; &amp;nbsp; &amp;nbsp; tableEnv.registerDataStream("testCountTable",
&gt; waterMarkStream, 'curuserid,'timelong,'rowtime.rowtime)
&gt; &amp;nbsp; &amp;nbsp;&amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; val result = tableEnv.sqlQuery(s"SELECT COUNT(0) as
&gt; pv,COUNT(distinct curuserid)" +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; s" as uv,TUMBLE_END(rowtime, INTERVAL '10' MINUTE)
&gt; FROM testCountTable GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)")
&gt;
&gt;
&gt; &amp;nbsp; &amp;nbsp; val dsRow: DataStream[Row] =
&gt; tableEnv.toAppendStream[Row](result)
&gt;
&gt;
&gt; &amp;nbsp; &amp;nbsp; val data=dsRow.map(w =&amp;gt; {
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; val StrArrary = w.toString.split(",")
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; val
&gt; str="{\"pv\":"+"\""+StrArrary(0)+"\""+",\"uv\":"+"\""+StrArrary(1)+"\""+",\"rowtime\":"+"\""+StrArrary(2)+"\""+"}"
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; str
&gt; &amp;nbsp; &amp;nbsp; })
&gt; &amp;nbsp; &amp;nbsp; data.print()
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2019年9月12日(星期四) 中午12:51
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: blinkSQL架构会自动清理过期的state吗
&gt;
&gt;
&gt;
&gt; Hi,
&gt; 能提供下 SQL 么?
&gt;
&gt; blink sql 的 window 理论上是会自动清理的。
&gt;
&gt; &amp;gt; 在 2019年9月11日,18:56,守护 <[hidden email]&amp;gt; 写道:
&gt; &amp;gt;
&gt; &amp;gt; 社区各位大佬:
&gt; &amp;gt; 请教一个问题,flink1.9中使用blink
&gt; SQL语句,设置时间窗口,state存储方式选择FSStateBackend,现在发现State一直在增大,过了窗口后也没有删除过期state,是blink架构就不支持窗口自动清理state吗,还是我哪使用的不对,我测试1.9的flinkSQL是不会有这个问题的。