使用滚动窗口的 Flink SQL State 一直增加

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

使用滚动窗口的 Flink SQL State 一直增加

瓜牛
hi,大家好!

现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加

SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 role_id 的去重数

疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state 大小却一直是增加的,是 SQL 写得有问题吗?

麻烦大家帮我看一下

谢谢!

----------------

CREATE TABLE source_kafka (
  dtime string,
  wm as cast(dtime as TIMESTAMP(3)),
  server string,
  reason string,
  role_id string,
  WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.group.id' = 'xxx',
  'format.type' = 'json',
)
-----------------

CREATE TABLE sink_kafka (
  window_time string,
  server string,
  reason string,
  role_id_distinct_cnt BIGINT,
  log_cnt BIGINT
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'format.type' = 'json'
)
-----------------

INSERT INTO sink_kafka
SELECT
 DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd HH:mm:ss') AS window_time,
 server,
 reason,
 COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
 COUNT(1) AS log_cnt
FROM source_kafka
GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
Reply | Threaded
Open this post in threaded view
|

Re: 使用滚动窗口的 Flink SQL State 一直增加

Benchao Li
Hi,

看起来你的写法应该没有太大问题。可能有两个问题需要确认一下:
1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的
2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升?

瓜牛 <[hidden email]> 于2020年5月26日周二 下午6:07写道:

> hi,大家好!
>
> 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加
>
> SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和
> role_id 的去重数
>
> 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state
> 大小却一直是增加的,是 SQL 写得有问题吗?
>
> 麻烦大家帮我看一下
>
> 谢谢!
>
> ----------------
>
> CREATE TABLE source_kafka (
>   dtime string,
>   wm as cast(dtime as TIMESTAMP(3)),
>   server string,
>   reason string,
>   role_id string,
>   WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = '0.11',
>   'connector.topic' = 'xxx',
>   'connector.properties.bootstrap.servers' = 'xxx',
>   'connector.properties.zookeeper.connect' = 'xxx',
>   'connector.properties.group.id' = 'xxx',
>   'format.type' = 'json',
> )
> -----------------
>
> CREATE TABLE sink_kafka (
>   window_time string,
>   server string,
>   reason string,
>   role_id_distinct_cnt BIGINT,
>   log_cnt BIGINT
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = '0.11',
>   'connector.topic' = 'xxx',
>   'connector.properties.bootstrap.servers' = 'xxx',
>   'connector.properties.zookeeper.connect' = 'xxx',
>   'format.type' = 'json'
> )
> -----------------
>
> INSERT INTO sink_kafka
> SELECT
>  DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd HH:mm:ss')
> AS window_time,
>  server,
>  reason,
>  COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
>  COUNT(1) AS log_cnt
> FROM source_kafka
> GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 使用滚动窗口的 Flink SQL State 一直增加

LakeShen
Hi,

看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by

Best,
LakeShen

Benchao Li <[hidden email]> 于2020年5月26日周二 下午6:50写道:

> Hi,
>
> 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下:
> 1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的
> 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升?
>
> 瓜牛 <[hidden email]> 于2020年5月26日周二 下午6:07写道:
>
> > hi,大家好!
> >
> > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加
> >
> > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和
> > role_id 的去重数
> >
> > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state
> > 大小却一直是增加的,是 SQL 写得有问题吗?
> >
> > 麻烦大家帮我看一下
> >
> > 谢谢!
> >
> > ----------------
> >
> > CREATE TABLE source_kafka (
> >   dtime string,
> >   wm as cast(dtime as TIMESTAMP(3)),
> >   server string,
> >   reason string,
> >   role_id string,
> >   WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = '0.11',
> >   'connector.topic' = 'xxx',
> >   'connector.properties.bootstrap.servers' = 'xxx',
> >   'connector.properties.zookeeper.connect' = 'xxx',
> >   'connector.properties.group.id' = 'xxx',
> >   'format.type' = 'json',
> > )
> > -----------------
> >
> > CREATE TABLE sink_kafka (
> >   window_time string,
> >   server string,
> >   reason string,
> >   role_id_distinct_cnt BIGINT,
> >   log_cnt BIGINT
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = '0.11',
> >   'connector.topic' = 'xxx',
> >   'connector.properties.bootstrap.servers' = 'xxx',
> >   'connector.properties.zookeeper.connect' = 'xxx',
> >   'format.type' = 'json'
> > )
> > -----------------
> >
> > INSERT INTO sink_kafka
> > SELECT
> >  DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd
> HH:mm:ss')
> > AS window_time,
> >  server,
> >  reason,
> >  COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
> >  COUNT(1) AS log_cnt
> > FROM source_kafka
> > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
>
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: 使用滚动窗口的 Flink SQL State 一直增加

Benchao Li
Hi,

这应该是个bug,之前也有人跟我提过,我没在意。现在看来应该的确是bug,我在本地复现了一下。我建了一个issue[1] 来跟踪和修复。

[1] https://issues.apache.org/jira/browse/FLINK-17942

LakeShen <[hidden email]> 于2020年5月26日周二 下午8:14写道:

> Hi,
>
> 看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by
>
> Best,
> LakeShen
>
> Benchao Li <[hidden email]> 于2020年5月26日周二 下午6:50写道:
>
> > Hi,
> >
> > 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下:
> > 1.
> 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的
> > 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升?
> >
> > 瓜牛 <[hidden email]> 于2020年5月26日周二 下午6:07写道:
> >
> > > hi,大家好!
> > >
> > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加
> > >
> > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和
> > > role_id 的去重数
> > >
> > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state
> > > 大小却一直是增加的,是 SQL 写得有问题吗?
> > >
> > > 麻烦大家帮我看一下
> > >
> > > 谢谢!
> > >
> > > ----------------
> > >
> > > CREATE TABLE source_kafka (
> > >   dtime string,
> > >   wm as cast(dtime as TIMESTAMP(3)),
> > >   server string,
> > >   reason string,
> > >   role_id string,
> > >   WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
> > > ) WITH (
> > >   'connector.type' = 'kafka',
> > >   'connector.version' = '0.11',
> > >   'connector.topic' = 'xxx',
> > >   'connector.properties.bootstrap.servers' = 'xxx',
> > >   'connector.properties.zookeeper.connect' = 'xxx',
> > >   'connector.properties.group.id' = 'xxx',
> > >   'format.type' = 'json',
> > > )
> > > -----------------
> > >
> > > CREATE TABLE sink_kafka (
> > >   window_time string,
> > >   server string,
> > >   reason string,
> > >   role_id_distinct_cnt BIGINT,
> > >   log_cnt BIGINT
> > > ) WITH (
> > >   'connector.type' = 'kafka',
> > >   'connector.version' = '0.11',
> > >   'connector.topic' = 'xxx',
> > >   'connector.properties.bootstrap.servers' = 'xxx',
> > >   'connector.properties.zookeeper.connect' = 'xxx',
> > >   'format.type' = 'json'
> > > )
> > > -----------------
> > >
> > > INSERT INTO sink_kafka
> > > SELECT
> > >  DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd
> > HH:mm:ss')
> > > AS window_time,
> > >  server,
> > >  reason,
> > >  COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
> > >  COUNT(1) AS log_cnt
> > > FROM source_kafka
> > > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复: 使用滚动窗口的 Flink SQL State 一直增加

瓜牛
Hi,


好的,谢谢回复




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月26日(星期二) 晚上8:26
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 使用滚动窗口的 Flink SQL State 一直增加



Hi,

这应该是个bug,之前也有人跟我提过,我没在意。现在看来应该的确是bug,我在本地复现了一下。我建了一个issue[1] 来跟踪和修复。

[1] https://issues.apache.org/jira/browse/FLINK-17942

LakeShen <[hidden email]&gt; 于2020年5月26日周二 下午8:14写道:

&gt; Hi,
&gt;
&gt; 看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by
&gt;
&gt; Best,
&gt; LakeShen
&gt;
&gt; Benchao Li <[hidden email]&gt; 于2020年5月26日周二 下午6:50写道:
&gt;
&gt; &gt; Hi,
&gt; &gt;
&gt; &gt; 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下:
&gt; &gt; 1.
&gt; 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的
&gt; &gt; 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升?
&gt; &gt;
&gt; &gt; 瓜牛 <[hidden email]&gt; 于2020年5月26日周二 下午6:07写道:
&gt; &gt;
&gt; &gt; &gt; hi,大家好!
&gt; &gt; &gt;
&gt; &gt; &gt; 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加
&gt; &gt; &gt;
&gt; &gt; &gt; SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和
&gt; &gt; &gt; role_id 的去重数
&gt; &gt; &gt;
&gt; &gt; &gt; 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state
&gt; &gt; &gt; 大小却一直是增加的,是 SQL 写得有问题吗?
&gt; &gt; &gt;
&gt; &gt; &gt; 麻烦大家帮我看一下
&gt; &gt; &gt;
&gt; &gt; &gt; 谢谢!
&gt; &gt; &gt;
&gt; &gt; &gt; ----------------
&gt; &gt; &gt;
&gt; &gt; &gt; CREATE TABLE source_kafka (
&gt; &gt; &gt;&nbsp;&nbsp; dtime string,
&gt; &gt; &gt;&nbsp;&nbsp; wm as cast(dtime as TIMESTAMP(3)),
&gt; &gt; &gt;&nbsp;&nbsp; server string,
&gt; &gt; &gt;&nbsp;&nbsp; reason string,
&gt; &gt; &gt;&nbsp;&nbsp; role_id string,
&gt; &gt; &gt;&nbsp;&nbsp; WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
&gt; &gt; &gt; ) WITH (
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.version' = '0.11',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.topic' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.group.id' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'format.type' = 'json',
&gt; &gt; &gt; )
&gt; &gt; &gt; -----------------
&gt; &gt; &gt;
&gt; &gt; &gt; CREATE TABLE sink_kafka (
&gt; &gt; &gt;&nbsp;&nbsp; window_time string,
&gt; &gt; &gt;&nbsp;&nbsp; server string,
&gt; &gt; &gt;&nbsp;&nbsp; reason string,
&gt; &gt; &gt;&nbsp;&nbsp; role_id_distinct_cnt BIGINT,
&gt; &gt; &gt;&nbsp;&nbsp; log_cnt BIGINT
&gt; &gt; &gt; ) WITH (
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.version' = '0.11',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.topic' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'xxx',
&gt; &gt; &gt;&nbsp;&nbsp; 'format.type' = 'json'
&gt; &gt; &gt; )
&gt; &gt; &gt; -----------------
&gt; &gt; &gt;
&gt; &gt; &gt; INSERT INTO sink_kafka
&gt; &gt; &gt; SELECT
&gt; &gt; &gt;&nbsp; DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd
&gt; &gt; HH:mm:ss')
&gt; &gt; &gt; AS window_time,
&gt; &gt; &gt;&nbsp; server,
&gt; &gt; &gt;&nbsp; reason,
&gt; &gt; &gt;&nbsp; COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
&gt; &gt; &gt;&nbsp; COUNT(1) AS log_cnt
&gt; &gt; &gt; FROM source_kafka
&gt; &gt; &gt; GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; --
&gt; &gt;
&gt; &gt; Best,
&gt; &gt; Benchao Li
&gt; &gt;
&gt;


--

Best,
Benchao Li