Broadcast stream causing GC overhead limit exceeded

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Broadcast stream causing GC overhead limit exceeded

Eleanore Jin
Hi All, 

I am using apache Beam with Flink (1.8.2). In my job, I am using Beam sideinput (which translates into Flink NonKeyedBroadcastStream) to do filter of the data from main stream. 

I have experienced OOM: GC overhead limit exceeded continuously. 

After did some experiments, I observed following behaviour:
1. run job without side input(broadcast stream): no OOM issue
2. run job with side input (kafka topic with 1 partition) with data available from this side input: no OOM issue
3. run job with side input (kafka topic with 1 partition) without any data from the side input: OOM issue
4. From the heap dump, the message (of type ObjectNode) cannot be GC'd looks like due to the references hold by Broadcast stream


My question is: what is the behaviour from Broadcast stream if there is no data available? Does it cache the data from main stream and wait until data becoming available from Broadcast stream to process? 

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Broadcast stream causing GC overhead limit exceeded

Fabian Hueske
Hi Eleanore,

The "GC overhead limit exceeded" error shows that the JVM spends way too much time garbage collecting and only recovers little memory with every run.
Since, the program doesn't make any progress in such a situation it is terminated with the GC Overhead Error. This typically happens when lots of temporary objects are created.
The root cause could be Flink, Beam, or your own code.
It's important to understand that this error is not directly related to a shortage of memory (although more memory can help to mitigate the issue a bit) but rather indicates an implementation issue.

Coming back to your question, Flink's Broadcast stream does *not* block or collect events from the non-broadcasted side if the broadcast side doesn't serve events.
However, the user-implemented operators (Beam or your code in this case) often puts non-broadcasted events into state to wait for input from the other side.
Since the error is not about lack of memory, the buffering in Flink state might not be the problem here.

Best, Fabian





Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <[hidden email]>:
Hi All, 

I am using apache Beam with Flink (1.8.2). In my job, I am using Beam sideinput (which translates into Flink NonKeyedBroadcastStream) to do filter of the data from main stream. 

I have experienced OOM: GC overhead limit exceeded continuously. 

After did some experiments, I observed following behaviour:
1. run job without side input(broadcast stream): no OOM issue
2. run job with side input (kafka topic with 1 partition) with data available from this side input: no OOM issue
3. run job with side input (kafka topic with 1 partition) without any data from the side input: OOM issue
4. From the heap dump, the message (of type ObjectNode) cannot be GC'd looks like due to the references hold by Broadcast stream


My question is: what is the behaviour from Broadcast stream if there is no data available? Does it cache the data from main stream and wait until data becoming available from Broadcast stream to process? 

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Broadcast stream causing GC overhead limit exceeded

Eleanore Jin
Hi Fabian, 

I just got confirmation from Apache Beam community, Beam will buffer the data until there is data from broadcast stream.

Thanks!
Eleanore

On Tue, May 5, 2020 at 12:31 AM Fabian Hueske <[hidden email]> wrote:
Hi Eleanore,

The "GC overhead limit exceeded" error shows that the JVM spends way too much time garbage collecting and only recovers little memory with every run.
Since, the program doesn't make any progress in such a situation it is terminated with the GC Overhead Error. This typically happens when lots of temporary objects are created.
The root cause could be Flink, Beam, or your own code.
It's important to understand that this error is not directly related to a shortage of memory (although more memory can help to mitigate the issue a bit) but rather indicates an implementation issue.

Coming back to your question, Flink's Broadcast stream does *not* block or collect events from the non-broadcasted side if the broadcast side doesn't serve events.
However, the user-implemented operators (Beam or your code in this case) often puts non-broadcasted events into state to wait for input from the other side.
Since the error is not about lack of memory, the buffering in Flink state might not be the problem here.

Best, Fabian





Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <[hidden email]>:
Hi All, 

I am using apache Beam with Flink (1.8.2). In my job, I am using Beam sideinput (which translates into Flink NonKeyedBroadcastStream) to do filter of the data from main stream. 

I have experienced OOM: GC overhead limit exceeded continuously. 

After did some experiments, I observed following behaviour:
1. run job without side input(broadcast stream): no OOM issue
2. run job with side input (kafka topic with 1 partition) with data available from this side input: no OOM issue
3. run job with side input (kafka topic with 1 partition) without any data from the side input: OOM issue
4. From the heap dump, the message (of type ObjectNode) cannot be GC'd looks like due to the references hold by Broadcast stream


My question is: what is the behaviour from Broadcast stream if there is no data available? Does it cache the data from main stream and wait until data becoming available from Broadcast stream to process? 

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Broadcast stream causing GC overhead limit exceeded

Fabian Hueske
Hi Eleanore,

Thanks for sharing your findings with us. :-)

Cheers, Fabian

Am Do., 7. Mai 2020 um 04:56 Uhr schrieb Eleanore Jin <[hidden email]>:
Hi Fabian, 

I just got confirmation from Apache Beam community, Beam will buffer the data until there is data from broadcast stream.

Thanks!
Eleanore

On Tue, May 5, 2020 at 12:31 AM Fabian Hueske <[hidden email]> wrote:
Hi Eleanore,

The "GC overhead limit exceeded" error shows that the JVM spends way too much time garbage collecting and only recovers little memory with every run.
Since, the program doesn't make any progress in such a situation it is terminated with the GC Overhead Error. This typically happens when lots of temporary objects are created.
The root cause could be Flink, Beam, or your own code.
It's important to understand that this error is not directly related to a shortage of memory (although more memory can help to mitigate the issue a bit) but rather indicates an implementation issue.

Coming back to your question, Flink's Broadcast stream does *not* block or collect events from the non-broadcasted side if the broadcast side doesn't serve events.
However, the user-implemented operators (Beam or your code in this case) often puts non-broadcasted events into state to wait for input from the other side.
Since the error is not about lack of memory, the buffering in Flink state might not be the problem here.

Best, Fabian





Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <[hidden email]>:
Hi All, 

I am using apache Beam with Flink (1.8.2). In my job, I am using Beam sideinput (which translates into Flink NonKeyedBroadcastStream) to do filter of the data from main stream. 

I have experienced OOM: GC overhead limit exceeded continuously. 

After did some experiments, I observed following behaviour:
1. run job without side input(broadcast stream): no OOM issue
2. run job with side input (kafka topic with 1 partition) with data available from this side input: no OOM issue
3. run job with side input (kafka topic with 1 partition) without any data from the side input: OOM issue
4. From the heap dump, the message (of type ObjectNode) cannot be GC'd looks like due to the references hold by Broadcast stream


My question is: what is the behaviour from Broadcast stream if there is no data available? Does it cache the data from main stream and wait until data becoming available from Broadcast stream to process? 

Thanks a lot!
Eleanore