hi,大家好!
现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加 SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 role_id 的去重数 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state 大小却一直是增加的,是 SQL 写得有问题吗? 麻烦大家帮我看一下 谢谢! ---------------- CREATE TABLE source_kafka ( dtime string, wm as cast(dtime as TIMESTAMP(3)), server string, reason string, role_id string, WATERMARK FOR wm AS wm - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'xxx', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.zookeeper.connect' = 'xxx', 'connector.properties.group.id' = 'xxx', 'format.type' = 'json', ) ----------------- CREATE TABLE sink_kafka ( window_time string, server string, reason string, role_id_distinct_cnt BIGINT, log_cnt BIGINT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'xxx', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.zookeeper.connect' = 'xxx', 'format.type' = 'json' ) ----------------- INSERT INTO sink_kafka SELECT DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd HH:mm:ss') AS window_time, server, reason, COUNT(DISTINCT role_id) AS role_id_distinct_cnt, COUNT(1) AS log_cnt FROM source_kafka GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason |
Hi,
看起来你的写法应该没有太大问题。可能有两个问题需要确认一下: 1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升? 瓜牛 <[hidden email]> 于2020年5月26日周二 下午6:07写道: > hi,大家好! > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加 > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 > role_id 的去重数 > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state > 大小却一直是增加的,是 SQL 写得有问题吗? > > 麻烦大家帮我看一下 > > 谢谢! > > ---------------- > > CREATE TABLE source_kafka ( > dtime string, > wm as cast(dtime as TIMESTAMP(3)), > server string, > reason string, > role_id string, > WATERMARK FOR wm AS wm - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'xxx', > 'connector.properties.bootstrap.servers' = 'xxx', > 'connector.properties.zookeeper.connect' = 'xxx', > 'connector.properties.group.id' = 'xxx', > 'format.type' = 'json', > ) > ----------------- > > CREATE TABLE sink_kafka ( > window_time string, > server string, > reason string, > role_id_distinct_cnt BIGINT, > log_cnt BIGINT > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'xxx', > 'connector.properties.bootstrap.servers' = 'xxx', > 'connector.properties.zookeeper.connect' = 'xxx', > 'format.type' = 'json' > ) > ----------------- > > INSERT INTO sink_kafka > SELECT > DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd HH:mm:ss') > AS window_time, > server, > reason, > COUNT(DISTINCT role_id) AS role_id_distinct_cnt, > COUNT(1) AS log_cnt > FROM source_kafka > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason -- Best, Benchao Li |
Hi,
看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by Best, LakeShen Benchao Li <[hidden email]> 于2020年5月26日周二 下午6:50写道: > Hi, > > 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下: > 1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的 > 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升? > > 瓜牛 <[hidden email]> 于2020年5月26日周二 下午6:07写道: > > > hi,大家好! > > > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加 > > > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 > > role_id 的去重数 > > > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state > > 大小却一直是增加的,是 SQL 写得有问题吗? > > > > 麻烦大家帮我看一下 > > > > 谢谢! > > > > ---------------- > > > > CREATE TABLE source_kafka ( > > dtime string, > > wm as cast(dtime as TIMESTAMP(3)), > > server string, > > reason string, > > role_id string, > > WATERMARK FOR wm AS wm - INTERVAL '5' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'xxx', > > 'connector.properties.bootstrap.servers' = 'xxx', > > 'connector.properties.zookeeper.connect' = 'xxx', > > 'connector.properties.group.id' = 'xxx', > > 'format.type' = 'json', > > ) > > ----------------- > > > > CREATE TABLE sink_kafka ( > > window_time string, > > server string, > > reason string, > > role_id_distinct_cnt BIGINT, > > log_cnt BIGINT > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'xxx', > > 'connector.properties.bootstrap.servers' = 'xxx', > > 'connector.properties.zookeeper.connect' = 'xxx', > > 'format.type' = 'json' > > ) > > ----------------- > > > > INSERT INTO sink_kafka > > SELECT > > DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd > HH:mm:ss') > > AS window_time, > > server, > > reason, > > COUNT(DISTINCT role_id) AS role_id_distinct_cnt, > > COUNT(1) AS log_cnt > > FROM source_kafka > > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason > > > > -- > > Best, > Benchao Li > |
Hi,
这应该是个bug,之前也有人跟我提过,我没在意。现在看来应该的确是bug,我在本地复现了一下。我建了一个issue[1] 来跟踪和修复。 [1] https://issues.apache.org/jira/browse/FLINK-17942 LakeShen <[hidden email]> 于2020年5月26日周二 下午8:14写道: > Hi, > > 看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by > > Best, > LakeShen > > Benchao Li <[hidden email]> 于2020年5月26日周二 下午6:50写道: > > > Hi, > > > > 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下: > > 1. > 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的 > > 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升? > > > > 瓜牛 <[hidden email]> 于2020年5月26日周二 下午6:07写道: > > > > > hi,大家好! > > > > > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加 > > > > > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 > > > role_id 的去重数 > > > > > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state > > > 大小却一直是增加的,是 SQL 写得有问题吗? > > > > > > 麻烦大家帮我看一下 > > > > > > 谢谢! > > > > > > ---------------- > > > > > > CREATE TABLE source_kafka ( > > > dtime string, > > > wm as cast(dtime as TIMESTAMP(3)), > > > server string, > > > reason string, > > > role_id string, > > > WATERMARK FOR wm AS wm - INTERVAL '5' SECOND > > > ) WITH ( > > > 'connector.type' = 'kafka', > > > 'connector.version' = '0.11', > > > 'connector.topic' = 'xxx', > > > 'connector.properties.bootstrap.servers' = 'xxx', > > > 'connector.properties.zookeeper.connect' = 'xxx', > > > 'connector.properties.group.id' = 'xxx', > > > 'format.type' = 'json', > > > ) > > > ----------------- > > > > > > CREATE TABLE sink_kafka ( > > > window_time string, > > > server string, > > > reason string, > > > role_id_distinct_cnt BIGINT, > > > log_cnt BIGINT > > > ) WITH ( > > > 'connector.type' = 'kafka', > > > 'connector.version' = '0.11', > > > 'connector.topic' = 'xxx', > > > 'connector.properties.bootstrap.servers' = 'xxx', > > > 'connector.properties.zookeeper.connect' = 'xxx', > > > 'format.type' = 'json' > > > ) > > > ----------------- > > > > > > INSERT INTO sink_kafka > > > SELECT > > > DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd > > HH:mm:ss') > > > AS window_time, > > > server, > > > reason, > > > COUNT(DISTINCT role_id) AS role_id_distinct_cnt, > > > COUNT(1) AS log_cnt > > > FROM source_kafka > > > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li |
Hi,
好的,谢谢回复 ------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年5月26日(星期二) 晚上8:26 收件人: "user-zh"<[hidden email]>; 主题: Re: 使用滚动窗口的 Flink SQL State 一直增加 Hi, 这应该是个bug,之前也有人跟我提过,我没在意。现在看来应该的确是bug,我在本地复现了一下。我建了一个issue[1] 来跟踪和修复。 [1] https://issues.apache.org/jira/browse/FLINK-17942 LakeShen <[hidden email]> 于2020年5月26日周二 下午8:14写道: > Hi, > > 看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by > > Best, > LakeShen > > Benchao Li <[hidden email]> 于2020年5月26日周二 下午6:50写道: > > > Hi, > > > > 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下: > > 1. > 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的 > > 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升? > > > > 瓜牛 <[hidden email]> 于2020年5月26日周二 下午6:07写道: > > > > > hi,大家好! > > > > > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加 > > > > > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 > > > role_id 的去重数 > > > > > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state > > > 大小却一直是增加的,是 SQL 写得有问题吗? > > > > > > 麻烦大家帮我看一下 > > > > > > 谢谢! > > > > > > ---------------- > > > > > > CREATE TABLE source_kafka ( > > > dtime string, > > > wm as cast(dtime as TIMESTAMP(3)), > > > server string, > > > reason string, > > > role_id string, > > > WATERMARK FOR wm AS wm - INTERVAL '5' SECOND > > > ) WITH ( > > > 'connector.type' = 'kafka', > > > 'connector.version' = '0.11', > > > 'connector.topic' = 'xxx', > > > 'connector.properties.bootstrap.servers' = 'xxx', > > > 'connector.properties.zookeeper.connect' = 'xxx', > > > 'connector.properties.group.id' = 'xxx', > > > 'format.type' = 'json', > > > ) > > > ----------------- > > > > > > CREATE TABLE sink_kafka ( > > > window_time string, > > > server string, > > > reason string, > > > role_id_distinct_cnt BIGINT, > > > log_cnt BIGINT > > > ) WITH ( > > > 'connector.type' = 'kafka', > > > 'connector.version' = '0.11', > > > 'connector.topic' = 'xxx', > > > 'connector.properties.bootstrap.servers' = 'xxx', > > > 'connector.properties.zookeeper.connect' = 'xxx', > > > 'format.type' = 'json' > > > ) > > > ----------------- > > > > > > INSERT INTO sink_kafka > > > SELECT > > > DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd > > HH:mm:ss') > > > AS window_time, > > > server, > > > reason, > > > COUNT(DISTINCT role_id) AS role_id_distinct_cnt, > > > COUNT(1) AS log_cnt > > > FROM source_kafka > > > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li |
Free forum by Nabble | Edit this page |