[flink-1.10.2] Blink SQL 超用内存严重

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

[flink-1.10.2] Blink SQL 超用内存严重

Tianwang Li
使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。


【join】

> SELECT `b`.`rowtime`,
> `a`.`c_id`,
> `b`.`openid`
> FROM `test_table_a` AS `a`
> INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> AND `a`.`openid` = `b`.`openid`
> AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> AND `a`.`rowtime` + INTERVAL '6' HOUR
>
>
【window】

> SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `rowtime`,
> HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__windoow_start__`,
> HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__window_end__`,
> `c_id`,
> COUNT(`openid`) AS `cnt`
> FROM `test_table_in_6h`
> GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> `c_id`
>


我配置了Fink的内存是4G, 实际使用达到了6.8 G。
同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右

【配置】

> cat conf/flink-conf.yaml
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 1
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> jobmanager.heap.size: 6144m
> taskmanager.memory.process.size: 4g
> taskmanager.memory.jvm-overhead.min: 1024m
> taskmanager.memory.jvm-overhead.max: 2048m
> taskmanager.debug.memory.log-interval: 10000
> env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
> -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
>



--
**************************************
 tivanli
**************************************
Reply | Threaded
Open this post in threaded view
|

Re: [flink-1.10.2] Blink SQL 超用内存严重

Benchao Li-2
Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
    join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
    `Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
    清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
    数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
    `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
    状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li <[hidden email]> 于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 10000
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **************************************
>  tivanli
> **************************************
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复:[flink-1.10.2] Blink SQL 超用内存严重

郑斌斌
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
单流跑的话,比较正常。
JOB的内存是4G。版本1.11.1
------------------------------------------------------------------
发件人:Benchao Li <[hidden email]>
发送时间:2020年9月23日(星期三) 10:50
收件人:user-zh <[hidden email]>
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
    join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
    `Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
    清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
    数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
    `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
    状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li <[hidden email]> 于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 10000
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **************************************
>  tivanli
> **************************************
>


--

Best,
Benchao Li

Reply | Threaded
Open this post in threaded view
|

Re: [flink-1.10.2] Blink SQL 超用内存严重

Tianwang Li
In reply to this post by Benchao Li-2
Hi Benchao, 感谢你的回复

我使用的RocksDB,内存 overhead 太多了, 什么地方超用了那么多内存,好像不受控制了。
另外,我也测试过hop窗口,也是超用内存比较。没有使用增量checkpoint。


最后,我这边的interval join 是 inner join ,使用的 b 表的rowtime作为时间,没有观察到延迟数据的情况。






Benchao Li <[hidden email]> 于2020年9月23日周三 上午10:50写道:
Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
    join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
    `Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
    清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
    数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
    `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
    状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li <[hidden email]> 于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 10000
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **************************************
>  tivanli
> **************************************
>


--

Best,
Benchao Li


--
**************************************
 tivanli
**************************************
Reply | Threaded
Open this post in threaded view
|

Re: [flink-1.10.2] Blink SQL 超用内存严重

Benchao Li-2
In reply to this post by 郑斌斌
超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。

郑斌斌 <[hidden email]> 于2020年9月23日周三 下午12:29写道:

>  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> KILL 。
> 单流跑的话,比较正常。
> JOB的内存是4G。版本1.11.1
> ------------------------------------------------------------------
> 发件人:Benchao Li <[hidden email]>
> 发送时间:2020年9月23日(星期三) 10:50
> 收件人:user-zh <[hidden email]>
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
>     join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
>     `Math.max(leftRelativeSize, rightRelativeSize) +
> allowedLateness`,根据你的SQL,这个值应该是6h
> 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
>     清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
>     数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
>     `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> 2;`,在你的SQL来讲,就是3h,也就是说
>     状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
> 希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li <[hidden email]> 于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c_id`,
> > > `b`.`openid`
> > > FROM `test_table_a` AS `a`
> > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > AND `a`.`openid` = `b`.`openid`
> > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
> > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > >
> > >
> > 【window】
> >
> > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
> > > `rowtime`,
> > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__windoow_start__`,
> > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__window_end__`,
> > > `c_id`,
> > > COUNT(`openid`) AS `cnt`
> > > FROM `test_table_in_6h`
> > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > `c_id`
> > >
> >
> >
> > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> >
> > 【配置】
> >
> > > cat conf/flink-conf.yaml
> > > jobmanager.rpc.address: flink-jobmanager
> > > taskmanager.numberOfTaskSlots: 1
> > > blob.server.port: 6124
> > > jobmanager.rpc.port: 6123
> > > taskmanager.rpc.port: 6122
> > > jobmanager.heap.size: 6144m
> > > taskmanager.memory.process.size: 4g
> > > taskmanager.memory.jvm-overhead.min: 1024m
> > > taskmanager.memory.jvm-overhead.max: 2048m
> > > taskmanager.debug.memory.log-interval: 10000
> > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > -XX:NumberOfGCLogFiles=10
> > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > >
> >
> >
> >
> > --
> > **************************************
> >  tivanli
> > **************************************
> >
>
>
> --
>
> Best,
> Benchao Li
>
>

--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复:[flink-1.10.2] Blink SQL 超用内存严重

郑斌斌
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来

java.lang.Exception: [2020-09-14 09:27:20.431]Container [pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 -Djobmanager.rpc.address=njdev-nn03.nj -Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c -Djobmanager.rpc.port=30246 -Drest.address=flink-node03
 |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' -Djobmanager.rpc.address='flink-node03' -Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' -Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out 2> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
[2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
[2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143.

 at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
------------------------------------------------------------------
发件人:Benchao Li <[hidden email]>
发送时间:2020年9月23日(星期三) 13:12
收件人:user-zh <[hidden email]>; 郑斌斌 <[hidden email]>
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
郑斌斌 <[hidden email]> 于2020年9月23日周三 下午12:29写道:
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
 单流跑的话,比较正常。
 JOB的内存是4G。版本1.11.1
 ------------------------------------------------------------------
 发件人:Benchao Li <[hidden email]>
 发送时间:2020年9月23日(星期三) 10:50
 收件人:user-zh <[hidden email]>
 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

 Hi Tianwang,

 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
     join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
     `Math.max(leftRelativeSize, rightRelativeSize) +
 allowedLateness`,根据你的SQL,这个值应该是6h
 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
     清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
     数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
     `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
 2;`,在你的SQL来讲,就是3h,也就是说
     状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

 希望这个可以解答你的疑惑~

 [1] https://issues.apache.org/jira/browse/FLINK-18996

 Tianwang Li <[hidden email]> 于2020年9月22日周二 下午8:26写道:

 > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
 >
 >
 > 【join】
 >
 > > SELECT `b`.`rowtime`,
 > > `a`.`c_id`,
 > > `b`.`openid`
 > > FROM `test_table_a` AS `a`
 > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
 > > AND `a`.`openid` = `b`.`openid`
 > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
 > > AND `a`.`rowtime` + INTERVAL '6' HOUR
 > >
 > >
 > 【window】
 >
 > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
 > > `rowtime`,
 > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
 > > `__windoow_start__`,
 > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
 > > `__window_end__`,
 > > `c_id`,
 > > COUNT(`openid`) AS `cnt`
 > > FROM `test_table_in_6h`
 > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
 > > `c_id`
 > >
 >
 >
 > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
 > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
 >
 > 【配置】
 >
 > > cat conf/flink-conf.yaml
 > > jobmanager.rpc.address: flink-jobmanager
 > > taskmanager.numberOfTaskSlots: 1
 > > blob.server.port: 6124
 > > jobmanager.rpc.port: 6123
 > > taskmanager.rpc.port: 6122
 > > jobmanager.heap.size: 6144m
 > > taskmanager.memory.process.size: 4g
 > > taskmanager.memory.jvm-overhead.min: 1024m
 > > taskmanager.memory.jvm-overhead.max: 2048m
 > > taskmanager.debug.memory.log-interval: 10000
 > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
 > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
 > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
 > -XX:NumberOfGCLogFiles=10
 > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
 > >
 >
 >
 >
 > --
 > **************************************
 >  tivanli
 > **************************************
 >


 --

 Best,
 Benchao Li



--

Best,
Benchao Li

Reply | Threaded
Open this post in threaded view
|

Re: [flink-1.10.2] Blink SQL 超用内存严重

Peidian Li
我也遇到过类似的问题,也是使用rocksdb,flink 1.10版本,我理解的是block cache超用,我这边的解决办法是增大了
taskmanager.memory.jvm-overhead.fraction
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-fraction>
,如果仍然出现内存超用这个问题,可以尝试调大taskmanager.memory.task.off-heap.size
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-task-off-heap-size>
,我这边这两个参数配置分别为taskmanager.memory.jvm-overhead.fraction=0.2,taskmanager.memory.task.off-heap.size=128m

可以尝试一下。

郑斌斌 <[hidden email]> 于2020年9月23日周三 下午1:33写道:

> 谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state,
> checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来
>
> java.lang.Exception: [2020-09-14 09:27:20.431]Container
> [pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is
> running 36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of
> 4 GB physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing
> container.
> Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
>  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>  |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239
> /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798
> -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1530082070b -D
> taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0
> -Djobmanager.rpc.address=njdev-nn03.nj
> -Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c
> -Djobmanager.rpc.port=30246 -Drest.address=flink-node03
>  |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c
> /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798
> -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1530082070b -D
> taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0'
> -Djobmanager.rpc.address='flink-node03'
> -Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c'
> -Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1>
> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
> 2>
> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
>
> [2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
> [2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143.
>
>  at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ------------------------------------------------------------------
> 发件人:Benchao Li <[hidden email]>
> 发送时间:2020年9月23日(星期三) 13:12
> 收件人:user-zh <[hidden email]>; 郑斌斌 <[hidden email]>
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
> 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
> 郑斌斌 <[hidden email]> 于2020年9月23日周三 下午12:29写道:
>  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> KILL 。
>  单流跑的话,比较正常。
>  JOB的内存是4G。版本1.11.1
>  ------------------------------------------------------------------
>  发件人:Benchao Li <[hidden email]>
>  发送时间:2020年9月23日(星期三) 10:50
>  收件人:user-zh <[hidden email]>
>  主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
>  Hi Tianwang,
>
>  不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
>  1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
>      join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
>      `Math.max(leftRelativeSize, rightRelativeSize) +
>  allowedLateness`,根据你的SQL,这个值应该是6h
>  2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
>      清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
>      数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
>      `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
>  2;`,在你的SQL来讲,就是3h,也就是说
>      状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
>  希望这个可以解答你的疑惑~
>
>  [1] https://issues.apache.org/jira/browse/FLINK-18996
>
>  Tianwang Li <[hidden email]> 于2020年9月22日周二 下午8:26写道:
>
>  > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>  >
>  >
>  > 【join】
>  >
>  > > SELECT `b`.`rowtime`,
>  > > `a`.`c_id`,
>  > > `b`.`openid`
>  > > FROM `test_table_a` AS `a`
>  > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
>  > > AND `a`.`openid` = `b`.`openid`
>  > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
>  > > AND `a`.`rowtime` + INTERVAL '6' HOUR
>  > >
>  > >
>  > 【window】
>  >
>  > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
>  > > `rowtime`,
>  > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
>  > > `__windoow_start__`,
>  > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
>  > > `__window_end__`,
>  > > `c_id`,
>  > > COUNT(`openid`) AS `cnt`
>  > > FROM `test_table_in_6h`
>  > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
>  > > `c_id`
>  > >
>  >
>  >
>  > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
>  > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>  >
>  > 【配置】
>  >
>  > > cat conf/flink-conf.yaml
>  > > jobmanager.rpc.address: flink-jobmanager
>  > > taskmanager.numberOfTaskSlots: 1
>  > > blob.server.port: 6124
>  > > jobmanager.rpc.port: 6123
>  > > taskmanager.rpc.port: 6122
>  > > jobmanager.heap.size: 6144m
>  > > taskmanager.memory.process.size: 4g
>  > > taskmanager.memory.jvm-overhead.min: 1024m
>  > > taskmanager.memory.jvm-overhead.max: 2048m
>  > > taskmanager.debug.memory.log-interval: 10000
>  > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
>  > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
>  > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
>  > -XX:NumberOfGCLogFiles=10
>  > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
>  > >
>  >
>  >
>  >
>  > --
>  > **************************************
>  >  tivanli
>  > **************************************
>  >
>
>
>  --
>
>  Best,
>  Benchao Li
>
>
>
> --
>
> Best,
> Benchao Li
>
>

--
-----------
Best  Regards
Peidian Li
Reply | Threaded
Open this post in threaded view
|

回复:[flink-1.10.2] Blink SQL 超用内存严重

郑斌斌
  谢谢Peidian ,我试一下
------------------------------------------------------------------
发件人:Peidian Li <[hidden email]>
发送时间:2020年9月23日(星期三) 14:02
收件人:user-zh <[hidden email]>; 郑斌斌 <[hidden email]>
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

我也遇到过类似的问题,也是使用rocksdb,flink 1.10版本,我理解的是block cache超用,我这边的解决办法是增大了taskmanager.memory.jvm-overhead.fraction,如果仍然出现内存超用这个问题,可以尝试调大taskmanager.memory.task.off-heap.size,我这边这两个参数配置分别为taskmanager.memory.jvm-overhead.fraction=0.2,taskmanager.memory.task.off-heap.size=128m

可以尝试一下。
郑斌斌 <[hidden email]> 于2020年9月23日周三 下午1:33写道:
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来

 java.lang.Exception: [2020-09-14 09:27:20.431]Container [pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container.
 Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
  |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 -Djobmanager.rpc.address=njdev-nn03.nj -Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c -Djobmanager.rpc.port=30246 -Drest.address=flink-node03
  |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' -Djobmanager.rpc.address='flink-node03' -Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' -Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out 2> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
 [2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
 [2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143.

  at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 ------------------------------------------------------------------
 发件人:Benchao Li <[hidden email]>
 发送时间:2020年9月23日(星期三) 13:12
 收件人:user-zh <[hidden email]>; 郑斌斌 <[hidden email]>
 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
 郑斌斌 <[hidden email]> 于2020年9月23日周三 下午12:29写道:
  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
  单流跑的话,比较正常。
  JOB的内存是4G。版本1.11.1
  ------------------------------------------------------------------
  发件人:Benchao Li <[hidden email]>
  发送时间:2020年9月23日(星期三) 10:50
  收件人:user-zh <[hidden email]>
  主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

  Hi Tianwang,

  不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

  1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
      join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
      `Math.max(leftRelativeSize, rightRelativeSize) +
  allowedLateness`,根据你的SQL,这个值应该是6h
  2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
      清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
      数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
      `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
  2;`,在你的SQL来讲,就是3h,也就是说
      状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

  希望这个可以解答你的疑惑~

  [1] https://issues.apache.org/jira/browse/FLINK-18996

  Tianwang Li <[hidden email]> 于2020年9月22日周二 下午8:26写道:

  > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
  >
  >
  > 【join】
  >
  > > SELECT `b`.`rowtime`,
  > > `a`.`c_id`,
  > > `b`.`openid`
  > > FROM `test_table_a` AS `a`
  > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
  > > AND `a`.`openid` = `b`.`openid`
  > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
  > > AND `a`.`rowtime` + INTERVAL '6' HOUR
  > >
  > >
  > 【window】
  >
  > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
  > > `rowtime`,
  > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
  > > `__windoow_start__`,
  > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
  > > `__window_end__`,
  > > `c_id`,
  > > COUNT(`openid`) AS `cnt`
  > > FROM `test_table_in_6h`
  > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
  > > `c_id`
  > >
  >
  >
  > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
  > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
  >
  > 【配置】
  >
  > > cat conf/flink-conf.yaml
  > > jobmanager.rpc.address: flink-jobmanager
  > > taskmanager.numberOfTaskSlots: 1
  > > blob.server.port: 6124
  > > jobmanager.rpc.port: 6123
  > > taskmanager.rpc.port: 6122
  > > jobmanager.heap.size: 6144m
  > > taskmanager.memory.process.size: 4g
  > > taskmanager.memory.jvm-overhead.min: 1024m
  > > taskmanager.memory.jvm-overhead.max: 2048m
  > > taskmanager.debug.memory.log-interval: 10000
  > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
  > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
  > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
  > -XX:NumberOfGCLogFiles=10
  > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
  > >
  >
  >
  >
  > --
  > **************************************
  >  tivanli
  > **************************************
  >


  --

  Best,
  Benchao Li



 --

 Best,
 Benchao Li



--
-----------Best  Regards
Peidian Li

Reply | Threaded
Open this post in threaded view
|

Re: [flink-1.10.2] Blink SQL 超用内存严重

Tianwang Li
In reply to this post by Benchao Li-2
使用的是 `RocksDBStateBackend`, 是什么超用了内存, 配置了“taskmanager.memory.process.size:
4g”,
并且有预留 1G 用于jvm-overhead。
现在超了2.8G,是什么超用的,我想了解一下。
如果控制不了,很容易被资源系统(yarn、k8s等) kill 了。


有没有,其他人有这方面的经验。



Benchao Li <[hidden email]> 于2020年9月23日周三 下午1:12写道:

> 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
> 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
>
> 郑斌斌 <[hidden email]> 于2020年9月23日周三 下午12:29写道:
>
> >  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> > KILL 。
> > 单流跑的话,比较正常。
> > JOB的内存是4G。版本1.11.1
> > ------------------------------------------------------------------
> > 发件人:Benchao Li <[hidden email]>
> > 发送时间:2020年9月23日(星期三) 10:50
> > 收件人:user-zh <[hidden email]>
> > 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
> >
> > Hi Tianwang,
> >
> > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
> >
> > 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> >     join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> >     `Math.max(leftRelativeSize, rightRelativeSize) +
> > allowedLateness`,根据你的SQL,这个值应该是6h
> > 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> >     清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> >     数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> >     `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> > 2;`,在你的SQL来讲,就是3h,也就是说
> >     状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
> >
> > 希望这个可以解答你的疑惑~
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18996
> >
> > Tianwang Li <[hidden email]> 于2020年9月22日周二 下午8:26写道:
> >
> > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> > >
> > >
> > > 【join】
> > >
> > > > SELECT `b`.`rowtime`,
> > > > `a`.`c_id`,
> > > > `b`.`openid`
> > > > FROM `test_table_a` AS `a`
> > > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > > AND `a`.`openid` = `b`.`openid`
> > > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> > SECOND
> > > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > > >
> > > >
> > > 【window】
> > >
> > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> > AS
> > > > `rowtime`,
> > > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > > `__windoow_start__`,
> > > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > > `__window_end__`,
> > > > `c_id`,
> > > > COUNT(`openid`) AS `cnt`
> > > > FROM `test_table_in_6h`
> > > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > > `c_id`
> > > >
> > >
> > >
> > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> > >
> > > 【配置】
> > >
> > > > cat conf/flink-conf.yaml
> > > > jobmanager.rpc.address: flink-jobmanager
> > > > taskmanager.numberOfTaskSlots: 1
> > > > blob.server.port: 6124
> > > > jobmanager.rpc.port: 6123
> > > > taskmanager.rpc.port: 6122
> > > > jobmanager.heap.size: 6144m
> > > > taskmanager.memory.process.size: 4g
> > > > taskmanager.memory.jvm-overhead.min: 1024m
> > > > taskmanager.memory.jvm-overhead.max: 2048m
> > > > taskmanager.debug.memory.log-interval: 10000
> > > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > > -XX:NumberOfGCLogFiles=10
> > > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > > >
> > >
> > >
> > >
> > > --
> > > **************************************
> > >  tivanli
> > > **************************************
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
> >
>
> --
>
> Best,
> Benchao Li
>


--
**************************************
 tivanli
**************************************
Reply | Threaded
Open this post in threaded view
|

Re: [flink-1.10.2] Blink SQL 超用内存严重

Tianwang Li
目前,观察到另外一个现象,
如果任务出现了异常,例如写Kafka失败,任务自动重启,这个时候就会突然飙升。
应该是任务失败之后,关闭重启,rocksdb占用到内存没有回收。
通过pmap查看,占用比较多内存多是很多个(128MB 和 64MB 内存块)。

另外,失败重启和如下多jira 描述重启任务多时候比较类似。


pmap图:




Tianwang Li <[hidden email]> 于2020年9月23日周三 下午9:11写道:
使用的是 `RocksDBStateBackend`, 是什么超用了内存, 配置了“taskmanager.memory.process.size: 4g”,
并且有预留 1G 用于jvm-overhead。 
现在超了2.8G,是什么超用的,我想了解一下。
如果控制不了,很容易被资源系统(yarn、k8s等) kill 了。


有没有,其他人有这方面的经验。



Benchao Li <[hidden email]> 于2020年9月23日周三 下午1:12写道:
超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。

郑斌斌 <[hidden email]> 于2020年9月23日周三 下午12:29写道:

>  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> KILL 。
> 单流跑的话,比较正常。
> JOB的内存是4G。版本1.11.1
> ------------------------------------------------------------------
> 发件人:Benchao Li <[hidden email]>
> 发送时间:2020年9月23日(星期三) 10:50
> 收件人:user-zh <[hidden email]>
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
>     join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
>     `Math.max(leftRelativeSize, rightRelativeSize) +
> allowedLateness`,根据你的SQL,这个值应该是6h
> 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
>     清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
>     数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
>     `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> 2;`,在你的SQL来讲,就是3h,也就是说
>     状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
> 希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li <[hidden email]> 于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c_id`,
> > > `b`.`openid`
> > > FROM `test_table_a` AS `a`
> > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > AND `a`.`openid` = `b`.`openid`
> > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
> > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > >
> > >
> > 【window】
> >
> > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
> > > `rowtime`,
> > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__windoow_start__`,
> > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__window_end__`,
> > > `c_id`,
> > > COUNT(`openid`) AS `cnt`
> > > FROM `test_table_in_6h`
> > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > `c_id`
> > >
> >
> >
> > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> >
> > 【配置】
> >
> > > cat conf/flink-conf.yaml
> > > jobmanager.rpc.address: flink-jobmanager
> > > taskmanager.numberOfTaskSlots: 1
> > > blob.server.port: 6124
> > > jobmanager.rpc.port: 6123
> > > taskmanager.rpc.port: 6122
> > > jobmanager.heap.size: 6144m
> > > taskmanager.memory.process.size: 4g
> > > taskmanager.memory.jvm-overhead.min: 1024m
> > > taskmanager.memory.jvm-overhead.max: 2048m
> > > taskmanager.debug.memory.log-interval: 10000
> > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > -XX:NumberOfGCLogFiles=10
> > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > >
> >
> >
> >
> > --
> > **************************************
> >  tivanli
> > **************************************
> >
>
>
> --
>
> Best,
> Benchao Li
>
>

--

Best,
Benchao Li


--
**************************************
 tivanli
**************************************


--
**************************************
 tivanli
**************************************