Flink-SQL数据倾斜处理

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink-SQL数据倾斜处理

yanggang_it_job
hello,通过FlinkSQL实现了一个简单的业务:Kafka to hive
但是任务不定期报错,某个TM异常挂掉,经排查可以得到如下日志
Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct mOpt>  or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...

可以通过两个参数进行调节,但是感觉这不是根本原因,现在怀疑是数据倾斜导致,为什么会任务是数据倾斜呢?,请看下图:
1.对内存使用曲线:

可以得出每个TM的堆内存(HeapMemory)使用相差很大。

2.直接内存曲线图
可以得出每个TM的直接内存(DirectMemory)使用相差很大。

问1:如果是数据倾斜导致的异常,那么在FlinkSQL场景中怎么处理?
问2:如果不是数据倾斜导致的,那么是什么原因导致的?解决方案是什么?

Best to you !!!