As a memory-based distributed computing engine, Spark’s memory management module plays an important role in the entire system. Understanding the basic principles of Spark memory management helps you better develop Spark applications and tune performance. In addition, efficient memory usage is the key to efficient Spark application performance. Improper memory usage leads to poor Spark performance.

When Spark applications are executed, the Spark cluster starts the Driver and Executor JVM processes. The Driver program is responsible for: (1) creating the Spark context; (2) Submit the Spark Job and convert the Job into a calculation Task and send it to Executor for calculation. (3) Coordinate task scheduling among Executor processes.

An Executor program performs one of the following tasks on a working node and returns the result to a Driver. (2) Provide storage function for RDD that needs persistence.

This article focuses on Executor memory management because Driver memory management is relatively simple and not very different from normal JVM programs. So all memory management mentioned in this article refers to Executor memory management.

Executor memory

As a JVM process, the memory management of an Executor process is built on top of the MEMORY management of the JVM. There are roughly two types of memory: in-heap memory and off-heap memory, as shown in Figure 1 below:

Within the heap memory

Refers to the JVM heap memory size, which is configured at Spark application startup using the spark.executor.memory parameter. Concurrent tasks running within Executor share JVM heap memory. The heap memory can be roughly divided into the following four parts as shown in Figure 2 (taking the unified memory management mechanism as an example) :

  1. Storage Memory: Stores Spark cache data, such as RDD cache, Broadcast variables, and Unroll data. It should be noted that the data of unrolled will be stored on the driver side if the memory is insufficient.
  2. Execution memory: Stores temporary data required during Spark task Execution, such as Shuffle, Join, Sort, and Aggregation.
  3. User Memory: Allocates the remaining Spark Memory for users to use as required. You can store the data structures you need to make TRANSFORMATIONS in RDD.
  4. Reserved memory: The Reserved memory is fixed for the system. The default is 300MB(RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024) in 1.6.0, but this parameter is not available in 2.4.4.

Out of memory

Off-heap Memory can be conceptually divided into two categories:

Off-heap Memory(*)

The Executor of the corresponding figure 1 off – within the JVM heap memory (*), mainly for the JVM itself, string, NIO Buffer etc, can spark. Executor. MemoryOverhead parameters configuration, The size is typically set to executorMemory * 0.10, with a minimum of 384.

Off-heap Memory(**)

Corresponding to (**) in Figure 1: To further optimize memory usage and improve Shuffle sorting efficiency, Spark introduces off-heap memory, which is directly allocated in the system memory of the working node to store serialized binary data. Spark can directly operate the out-of-heap memory of the system, reducing unnecessary memory overhead and frequent GC scanning and reclamation, improving processing performance. Out-of-heap memory can be allocated and freed precisely, and the space occupied by serialized data can be calculated precisely, making it easier to manage and less error-free than in-heap memory.

Outside the default heap memory is not enabled, can be configured spark. Memory. OffHeap. Enable enabled parameter, and by the spark. Memory. OffHeap. The size on the size of a pile of outer space. With the exception of no Other space, out-of-heap memory is divided in the same way as in-heap memory as shown in Figure 3 below (using the unified memory management mechanism as an example), with all running concurrent tasks sharing storage memory and execution memory.

Unified memory management mechanism

The unified memory management mechanism introduced after Spark 1.6 differs from static memory management in that the storage memory and execution memory share the same space and can dynamically occupy each other’s free area, as shown in Figure 2 and Figure 3. The most important optimization is the dynamic memory management mechanism. Its rules are as follows:

  • Set the basic Storage memory and Execution area (spark. Storage. StorageFraction parameters), the set to determine the scope of both sides have the space
  • If the space of both parties is insufficient, the storage is saved to the hard disk. If your own space is insufficient and the other side is free, you can borrow the other side’s space; (Insufficient storage is not enough to hold a full Block)
  • Once the Execution memory space is occupied, the Execution memory can be transferred to the hard disk as shown in Figure 4 and “returned” to the borrowed space
  • After the Storage memory space is occupied by the peer, the peer cannot “return” it. The excess Storage memory is transferred to the hard disk, as shown in Figure 5. This is complicated because many factors need to be considered during the Shuffle process

Spark application memory optimization

case 1

For Spark applications, the executor memory configuration is as follows: Spark spark. Executor. Memory = 124 g. The executor. MemoryOverhead = 10 g results under the condition of some large amount of data, memory beyond error figure 6 as follows:

This problem from the error prompt in better understanding, improve the spark. Yarn. The executor. MemoryOverhead memory with respect to ok, eventually up to 20 g our application success in running the task. However, my research found that in some Spark applications, no matter how much you increase this value, it doesn’t work. Add Stack Overflow discussion superimposed -memory-limits.

case 2

For the Spark application (Spark version 1.5.1) on a certain line, the memory of the running Storage is shown in Figure 7 below. After code walking, it was found that, for the RDD data of the same Lineage, the RDD data of the result was cached after each Action calculation. This results in RDD data of the same lineage being continuously pushed from cache to memory multiple times. The unified memory management mechanism introduced after Spark 1.6 causes memory overflow in some tasks with a large amount of data. The solution is to manually unpresis the RDD data of the previous lineage after each cache calculation of the resulting RDD data. After optimization, runtime Storage memory is shown in Figure 8 below. You can see that Storage memory usage has decreased by nearly 45%.

The above is the author’s summary of Spark memory model introduction and personal experience in memory tuning. Due to my limited level, if there is any mistake, please correct it. If the summary is useful, please give it a thumbs up

The resources

(1) www.ibm.com/developerwo… (2) stackoverflow.com/questions/4… (3) spark.apache.org/docs/latest… (4) www.jianshu.com/p/10e91ace3… (5) www.slideshare.net/databricks/…