Hi community,
For some high-parallel Flink jobs, when hdfs Namenode is under pressure, JM cleaning files becomes a bottleneck. I have some questions and hope to be answered. thanks.
Motivation:
Flink job:
Parallelism = 4000
Checkpoint interval = 10s
More than 5 Operators include State, and 4,000 * 5 = 20,000 files are generated every 10s.
A large number of State files are written to the chk-xxx directory, and only JM cleans up these files. When the hdfs Namenode is under pressure, the speed of JM cleaning up files is slow, and the cleaning speed cannot keep up with the speed of generating files, resulting in a large number of files remaining in hdfs.
Increasing the value of `state.backend.fs.memory-threshold` can alleviate the problem, but it cannot solve it at all. So I hope to clean up as much as possible at the directory level, not the file level.
Idea:
When the CompletedCheckpoint#doDiscard method cleans up the State, there are three steps:
1. Clean up the _metadata file
2. Clean up all state data file
3. Clean up the chk-xxx directory, and recursive=false
Question: The _metadata file and many OperatorState files are in the chk-xxx directory. Can the following changes be made:
1. Do not clean up the _metadata file
2. Clean up all state data file, but don't clean up the files of state data in the chk-xxx directory
3. Delete the chk-xxx directory, and recursive=true
The above changes will reduce hdfs calls in most scenarios and reduce NameNode pressure. I have completed the development and can get a lot of benefits.
commit link:
https://github.com/1996fanrui/flink/commit/728cffd6e556515e69bc14e646b03c5edcd84934Will there be some unknown risks? Hope to get expert guidance, thanks.
Best
fanrui