Since memory management for drivers is relatively simple, memory management is mainly concerned with Executor memory management.
On-heap Memory and off-heap Memory planning
Executor as a JVM process, Executor memory management is built on top of the MEMORY management of the JVM. Spark implements JVM memory management for in-heap memory and introduces out-of-heap memory to create space in the system memory of the working node, further optimizing memory usage. Out-of-heap memory is requested directly from the operating system.
On-heap Memory
In-heap Memory Overview:
When Spark starts, the size of heap memory is set by the –executor-memory or spark.executor. Memory parameter in spark-submit. Spark manages the memory in the heap in a logical “planned” way. Because the memory occupied by object instances is applied and released by the JVM, Spark can only record the memory after application and before release. To Spark a serialized object, because it is in the form of a byte stream, the memory size can be calculated directly, and for the serialized object, its memory is by periodic sampling estimation, that is not a new data item will be calculated once the size of the memory, this method reduces the time cost, but may error is bigger, At one point, the actual memory was much larger than expected. Therefore, Spark does not accurately record the actual available heap memory and therefore cannot avoid memory overflow – OOM.
1. Dynamic memory allocation
Before Spark1.6, static memory management was used, that is, the size of storage memory, execution memory, and other memory was fixed for the duration of the Spark application. Since Spark1.6, in-heap and out-of-heap memory are managed dynamically. Storage and Execution share the same block of Storage space, and each can dynamically occupy the other’s free area.
The rules of the dynamic occupation mechanism are as follows:
-
1. The first set of basic Storage and Execution area (spark. Storage. StorageFraction parameters), both sides have the set to determine the scope of space;
-
2. If the space of both parties is insufficient, the disks are saved to disks.
-
3. If one party has insufficient space and the other party has free space, it can borrow the other party’s space (insufficient storage space refers to insufficient space for a complete Block).
In the case shown in Figure 1, the Storage is running out of free Execution memory, and the Execution is about to begin. The storagelevel. MEMORY_AND_DISK /.MEMORY_AND_DISK_2 /.MEMORY_AND_DISK_SER /.MEMORY_AND_DISK_SER_2 data is overwritten to disk. If, after writing to disk, the data still does not meet the dashed line standard, the data stored in this part will be deleted directly. For this reason, cache/persist is generally used in conjunction with checkpoint. Spark-core checkpoint stores an RDD to a distributed file system. Spark-streaming checkpoint: 1. Save the whole running environment; 2. Save the unused RDD for fault recovery.
In the case shown in Figure 2, Execution is using the Storage’s free memory, but the Storage is about to start using it and wants the dashed line memory back. Execution: My Shuffle scheduling is too complex to make room for. The memory will not be returned to you. Storage of course said with indignation: Good! (hahahahahahahahahaha… It feels like a good time for Storage.
2. Elimination and falling
Because all computations of the same Executor share limited storage memory space, old blocks in the LinkHashMap are Eviction when new blocks need to be cached but the remaining memory space is insufficient and cannot be used dynamically. If a storage level contains a requirement to store a Block to a disk, Drop the Block. Otherwise, delete the Block. The blocks in the LinkedHashMap are traversed and eliminated according to LRU. The MemoryMode of the old blocks eliminated is the same as that of the new blocks, that is, they both belong to off-heap or in-heap memory. Old and new blocks cannot belong to the same RDD to avoid cyclic elimination.
In the CACHE of RDD, the original RDD containing multiple partitions in Others, and the record in each Partition is not continuous, transferred to Storage, and each RDD contains multiple blocks, and the record in each Block is continuous, This process is called Unroll.
Storage module logically uses blocks as the basic Storage unit. Each Partition of an RDD corresponds to a unique Block(BlockId format is RDD_RDD-ID_partition -ID) after processing. The Master on the Driver side manages and maintains metadata information about blocks of the Spark application, while the Slave on the Executor side reports Block updates to the Master and receives commands from the Master, such as adding or deleting an RDD.
3. Execution
Execution is mainly used to store the memory occupied by a task during Shuffle Execution. Shuffle is a process of repartitioning RDD data based on certain rules and consists of Shuffle Write and Shuffle Read phases.
3-1 Shuffle Write
(1) If the ordinary sorting method is selected on the map side, ExternalSorter will be used for externalization, and the execution space in the heap will be occupied when data is stored in memory;
(2) If the Tungsten mode is selected on the Map side, ShuffleExternalSorter is used to directly sort the data stored in serialized form. In-heap or out-of-heap execution space can be used when storing data in memory, depending on whether out-of-heap memory is enabled and whether out-of-heap memory is sufficient.
3-2 Shuffle Read
(1) When aggregating data on the Reduce end, the data should be delivered to the Aggregator for processing, and the execution space in the heap will be occupied when the data is stored in the memory.
(2) If the final result is sorted, the data will be handed over to ExternalSorter for processing again, occupying the execution space in the heap;
In ExternalSorter and Aggregator, Spark uses a hash table called AppendOnlyMap to store data in the heap execution memory. However, not all data can be stored in the hash table during Shuffle. The memory usage of the hash table is periodically estimated. When the memory cannot be allocated from the MemoryManager for execution, Spark is stored in a disk file. This process is called Spill. The files that overflow to the disk are merged.
4. Other Memory (Other)
It is used to store the RDD, metadata, and Spark object instances that are not cached. The RDD storage unit is Partition.
conclusion
Spark’s storage memory and execution memory are managed in different ways: For Storage, Spark uses a LinkedHashMap to centrally manage all blocks that are converted from partitions of the RDD that need to be cached, while for Execution, Spark uses AppendOnlyMap to store data during Shuffle and even abstract page management in Tungsten sorting, opening up a new JVM memory management mechanism.
Off-heap Memory
Spark introduces off-heap memory, which can create space in the system memory of the working node to store serialized binary data.
Out-of-heap memory means that memory objects are allocated to memory outside the Java virtual world, which is managed directly by the operating system rather than the virtual machine. The result is to keep the heap small to reduce the impact of garbage collection on the application. Spark can directly operate the system’s off-heap memory, reducing unnecessary system overhead and frequent GC scans and reclaim, improving processing performance. Out-of-heap memory can be allocated and freed precisely (JVM cleanups can’t be timed precisely), and the amount of space that serialized data takes up can be calculated precisely, making it easier and less error-free than in-heap memory.
Outside the default heap memory is not enabled, can be configured spark. Memory. Offheap. Enable enabled parameter, and by the spark. Memory. Offheap. The size on the size of a pile of outer space. Except that there is no Other space, in-heap memory is divided the same way as out-of-heap memory, with all running concurrent tasks sharing storage and execution memory.
Note: The picture is original, if reproduced, please indicate the source