Spark kernel refers to the core operation mechanism of Spark, including the operation mechanism of Spark core components, Spark task scheduling mechanism, Spark memory management mechanism, and the operation principle of Spark core functions. A good command of Spark kernel principle can help us complete Spark code design. And can help us accurately lock the crux of the problem in the process of project operation.
The Spark Shuffle parsing
The core of the Shuffle
ShuffleMapStage and ResultStage
When dividing the stages, the last stage is called finalStage, which is essentially a ResultStage object, and all the preceding stages are called ShufflemapStages. The ShuffleMapStage ends with the shuffle file written to disk. The ResultStage basically corresponds to the action operator in the code, that is, a function is applied to the data set of each PARTITION in the RDD, indicating the completion of a job.
Number of tasks on Shuffle
Spark Shuffle consists of the Map phase and Reduce phase, or ShuffleRead phase and ShuffleWrite phase. Therefore, several tasks execute both the Map and Reduce processes for a Shuffle. How do you determine the number of Map and Reduce tasks?
If the Spark task reads data from the HDFS, the initial number of RDD partitions is determined by the number of splits in the file, that is, a split corresponds to a partition in the GENERATED RDD. We assume that the initial number of partitions is N.
After the initial RDD is calculated by a series of operators (assuming that repartition and coalesce operators are not used for repartitioning, the number of partitions remains the same (N); if repartition operators are used, the number of partitions becomes M), we assume that the number of partitions remains the same. When the Shuffle operation is performed, The number of Map tasks is the same as that of partitions, that is, the number of Map tasks is N.
Reduce the stage take spark by default. The default. The parallelism of this configuration item value as the number of partitions, if there is no configuration, the last RDD to map the partition number as its partition number (N), then the partition number is decided to reduce the number of the task.
Read data from the Reduce end
According to the division of stages, map task and Reduce task are not in the same stage. Map Task is located in ShuffleMapStage and Reduce task is located in ResultStage. Map task will be executed first. How does the later Reduce task know where to pull the data from the map task? The data pulling process on the Reduce side is as follows:
- After the map task is executed, information such as calculation status and location of small disk files will be encapsulated into the MapStatus object. Then the MapOutPutTrackerWorker object in this process sends the mapStatus object to the MapOutPutTrackerMaster object in the Driver process.
- Before the Reduce task starts to execute, the MapOutputTrackerWorker in the current process sends a request to the MapoutPutTrakcerMaster in the Driver process to request the location of small files on disks.
- After all Map tasks are completed, the MapOutPutTrackerMaster in the Driver process knows the location of all small files on the disk. The MapOutPutTrackerMaster tells the MapOutPutTrackerWorker the location of the disk small file.
- After the previous operations are complete, BlockTransforService pulls data from the Executor0 node. By default, five child threads are started. The amount of data to be pulled at a time cannot exceed 48 MB. (The Reduce Task obtains a maximum of 48 MB of data at a time and stores the pulled data to 20% of the Executor memory.)
HashShuffle parsing
The following discussion assumes that each Executor has one CPU core.
Unoptimized HashShuffleManager
In the Shuffle write stage, data processed by each task is “divided” by key so that shuffle operators (such as reduceByKey) can be implemented in the next stage after the calculation of one stage. By “partition”, the same key is written to the same disk file by the hash algorithm, and each disk file belongs to only one task of the downstream stage. Before writing data to disk, data is written to the memory buffer. When the memory buffer is full, data is overwritten to the disk file.
How many tasks will be created for the next stage? How many disk files will be created for each task of the current stage. For example, if the next stage has a total of 100 tasks, each task of the current stage will create 100 disk files. If the current stage has 50 tasks, a total of 10 executors, and each Executor executes 5 tasks, then a total of 500 disk files will be created on each Executor, and 5000 disk files will be created on all executors. Thus, the number of disk files generated by an unoptimized Shuffle Write operation is staggering.
The shuffle read phase is usually what you do at the beginning of a stage. At this point, each task of this stage needs to pull all the same keys in the calculation results of the previous stage from each node to its own node through the network, and then perform key aggregation or connection operations. During shuffle Write, a Map task creates a disk file for each Reduce task in the downstream stage. Therefore, during shuffle Read, each Reduce task needs to locate all Map tasks in the upstream stage. Pull that disk file that belongs to oneself can.
Shuffle Read’s pull process is aggregated as it is pulled. Each Shuffle Read task has its own buffer. Each shuffle Read task can only pull data of the same size as the buffer and perform operations such as aggregation through a Map in memory. After aggregating a batch of data, pull down the next batch of data and put it into the buffer buffer for aggregation. And so on, until finally all the data to pull out, and get the final result.
The non-optimized HashShuffleManager works like this:
Optimized HashShuffleManager
To optimize HashShuffleManager we can set one parameter, spark.shuffle.consolidateFiles. The default value of this parameter is false. Setting it to true will enable the optimization mechanism. If we use HashShuffleManager, it is recommended that this option be turned on.
After the consolidate mechanism is enabled, tasks do not create a disk file for each task in the downstream stage during shuffle write. In this case, shuffleFileGroup is introduced. Each shuffleFileGroup corresponds to a batch of disk files, and the number of disk files is the same as the number of tasks in the downstream stages. An Executor can execute as many tasks in parallel as it has CPU cores. Each of the first tasks executed in parallel creates a shuffleFileGroup and writes data to the corresponding disk file.
When the Executor CPU core completes a batch of tasks and then executes the next batch of tasks, the next batch of tasks will reuse the existing shuffleFileGroup, including the disk files in it. That is, the task will write data to the existing disk files. It does not write to a new disk file. Therefore, consolidate allows different tasks to reuse the same batch of disk files. In this way, disk files of multiple tasks are consolidated to a certain extent, greatly reducing the number of disk files and improving shuffle Write performance.
Assuming the second stage has 100 tasks and the first stage has 50 tasks, there are still 10 executors (with 1 Executor CPU), each executing 5 tasks. The original unoptimized HashShuffleManager would have generated 500 disk files per Executor and 5,000 disk files for all executors. However, after optimization, the number of disk files created per Executor is calculated as follows: The number of CPU cores * the number of tasks at the next stage, i.e., each Executor will create 100 disk files and all executors will create 1000 disk files.
The working principle of the optimized HashShuffleManager is shown in the figure below:
SortShuffle parsing
SortShuffleManager can be divided into two running mechanisms, one is ordinary running mechanism, the other is bypass running mechanism. When the number of shuffle read tasks is less than or equal to the value of spark.shuffle.sort. BypassMergeThreshold (200 by default), the bypass mechanism is enabled.
Common Operating mechanism In this mode, data is first written into a memory data structure. In this case, different data structures may be selected based on the shuffle operator. If it is the reduceByKey shuffle operator of aggregation class, Map data structure will be selected and the data will be written into the memory while the aggregation is carried out through Map. If join is a common shuffle operator, Array data structures are directly written into the memory. Then, every time a piece of data is written into an in-memory data structure, it determines whether a critical threshold has been reached. If a critical threshold is reached, an attempt is made to overwrite the in-memory data structure to disk and then flush the in-memory data structure.
Before overwriting to disk files, the existing data in the memory data structure is sorted by key. After sorting, data is written to disk files in batches. The default batch number is 10000. That is, 10000 pieces of sorted data are written to disk files in batches. Writing to disk files is done through Java’s BufferedOutputStream. BufferedOutputStream is a Java BufferedOutputStream. The BufferedOutputStream first buffers data in the memory. When the memory buffer is full, data is written to the disk file again, which reduces disk I/o times and improves performance.
Multiple disk overwrites occur when a task writes all data to an in-memory data structure, resulting in multiple temporary files. Finally, all temporary disk files are merged. This process is called merge. In this process, data from all temporary disk files is read and written to the final disk file. In addition, a task corresponds to only one disk file, which means that the data prepared by the task for the tasks of the downstream stage are all in this file. Therefore, an index file is written separately, which identifies the start offset and end offset of the data of each downstream task in the file.
SortShuffleManager greatly reduces the number of files because it has a disk file merge process. For example, the first stage has 50 tasks and a total of 10 executors, each executing 5 tasks, while the second stage has 100 tasks. Because each task ends up with only one disk file, there are only five disk files per Executor and 50 disk files for all executors at this point.
The working principle of SortShuffleManager with common operation mechanism is shown in the figure:Bypass operation mechanismBypass operation mechanism of the trigger condition is as follows: shuffle map task number is less than the spark. Shuffle. Sort. BypassMergeThreshold parameter values. Lent is not a shuffle operator for an aggregate class.
In this case, each task creates a temporary disk file for each downstream task, hash the data based on the key, and write the key into the disk file based on the hash value of the key. Of course, a disk file is written to the memory buffer first, and then overwrites to the disk file when the buffer is full. Finally, all temporary disk files are also merged into a single disk file and a single index file is created.
The disk write mechanism of this process is exactly the same as that of the unoptimized HashShuffleManager, in that a staggering number of disk files are created, only a disk file merge is done at the end. Therefore, the small number of final disk files also makes shuffle Read perform better than the unoptimized HashShuffleManager.
The difference between this mechanism and ordinary SortShuffleManager operation mechanism lies in: first, the disk write mechanism is different; Second, it doesn’t sort. In other words, the biggest advantage of this mechanism is that data sorting is not required during Shuffle Write, thus reducing the performance overhead.
The working principle of SortShuffleManager of the bypass operation mechanism is shown in the figure below:
Spark Memory Management
When Spark applications are executed, the Spark cluster starts the Driver and Executor JVM processes. The Driver is the main control process that creates the Spark context, submits the Spark Job, and converts the Job into a computing Task. Task scheduling is coordinated between Executor processes that perform specific computation tasks on the working node and return the results to the Driver, as well as provide storage for persistent RDD. The memory management of a Driver is relatively simple, so this section mainly analyzes the memory management of executors. Spark memory in this section refers to the memory of executors.
In-heap and off-heap memory planning
As a JVM process, Executor’s memory management is built On top of the JVM’s memory management, and Spark allocates the JVM’s on-heap space in more detail to make full use of memory. Spark also introduces off-heap memory, which can create space directly in the system memory of the working node, further optimizing memory usage. In-heap memory is managed by THE JVM, and out-of-heap memory is directly applied and released from the operating system.
Within the heap memory
The size of the heap memory is set by the -executor -memory or spark.executor. Memory parameter when the Spark application is started. Concurrent tasks running within Executor share JVM heap memory, and memory used by these tasks to cache RDD data and Broadcast data is planned as Storage memory. The memory occupied by these tasks during Shuffle Execution is planned as Execution memory. The remaining space is not planned. Object instances inside Spark or in user-defined Spark applications occupy the remaining space. The space occupied by the three components varies according to the management mode. Spark manages the memory in the heap in a logical planning manner. Because the MEMORY used by object instances is allocated and released by the JVM, Spark can only record the memory after the application and before the release. The process for allocating memory is as follows:
- Spark new an object instance in the code;
- The JVM allocates space from memory in the heap, creates objects and returns object references;
- Spark saves a reference to the object and records the memory occupied by the object.
The process for releasing memory is as follows:
- Spark records the memory released by the object and deletes its reference.
- Wait for the JVM’s garbage collection mechanism to release the heap memory occupied by the object.
We know that the JVM object can be stored in the form of serialization, serialization is the process of the object is converted to binary byte streams, in essence can be understood as to transform the chain store for the discontinuous space for continuous or block storage space, a serialization is required to access the inverse process – the deserialization, to circulate the bytes object, Serialization can save storage space, but increase the computation overhead of storage and reading.
For serialized objects in Spark, the memory usage can be calculated directly because they are byte streams. For non-serialized objects, the memory usage is estimated by periodic sampling. That is, the memory usage is not calculated every time a new data item is added. This method reduces the time overhead but may cause a large error, resulting in the actual memory at a time may be far more than expected. In addition, it is very possible that object instances marked as freed by Spark are not actually reclaimed by the JVM, resulting in less memory actually available than the memory recorded by Spark. Therefore, Spark does not accurately record the actual heap Memory available, so it cannot completely avoid OOM (Out of Memory) exceptions.
Although not precisely control heap memory within the application and release, but the Spark of storage memory and execute the planning and management of the memory independently, and can decide whether to cache new RDD in storage memory, and whether the execution of memory for the new task allocation, to a certain extent can improve memory utilization, reduce the occurrence of abnormal.
Out of memory
To optimize memory usage and improve Shuffle sorting efficiency, Spark introduces off-heap memory, which can create space in the system memory of the working node to store serialized binary data.
Out-of-heap memory means that memory objects are allocated to memory outside the Java virtual machine’s heap, which is managed directly by the operating system (rather than the virtual machine). The result is to keep the heap small to reduce the impact of garbage collection on the application.
Using the JDK Unsafe API (which, starting with Spark 2.0, is not based on Tachyon when managing out-of-heap storage, but rather on the same implementation as out-of-heap execution), Spark can directly operate out-of-heap memory, Reduced unnecessary memory overhead, as well as frequent GC scans and collections, improves processing performance. Out-of-heap memory can be allocated and freed precisely because it is not allocated through the JVM, but directly to the operating system. The JVM does not specify a precise point in time for memory cleanup, so it cannot be freed precisely. Moreover, the space occupied by serialized data can be calculated precisely, so it is less difficult to manage and less error than the heap memory.
Outside the default heap memory is not enabled, can be configured spark. Memory. OffHeap. Enable enabled parameter, and by the spark. Memory. OffHeap. The size on the size of a pile of outer space. With the exception of no Other space, out-of-heap memory is divided the same way as in-heap memory, with all running concurrent tasks sharing storage and execution memory.
(This part of Memory is mainly used for shared libraries, Perm Space, thread Stack, and some Memory mapping, etc., or C-like allocate Object.)
Memory allocation
Static memory management
Under the static memory management mechanism initially adopted by Spark, the storage memory, execution memory, and other memory sizes are fixed during the Spark application running, but can be configured before the application is started. The memory allocation in the heap is shown in the following figure:
# in pile calculation formula of the available memory storage memory = systemMaxMemory * spark in storage. MemoryFraction * spark in storage, safety Fraction of the available memory = execution systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety FractionCopy the code
SystemMaxMemory depends on the size of memory in the current JVM heap, and the last available execution or storage memory is calculated by multiplying the respective memoryFraction parameters and safetyFraction parameters. The significance of the two safetyFraction parameters in the above calculation formula lies in logically reserving an insurance area of 1-SafetyFraction to reduce the risk of OOM caused by actual memory exceeding the current preset range (as mentioned above, Memory sampling estimates for unserialized objects can be inaccurate). It is important to note that this reserved safe area is just a logical plan, and Spark is used in the same way as “other memory”.
Both Storage memory and Execution memory are reserved in case of OOM, since records of memory size in Spark heap are inaccurate and need to be left safe.
Space allocation outside the heap is simple, including storage memory and execution memory, as shown in Figure 1-3. Execution of available memory and storage space of the memory footprint size directly by the parameters of the spark. Memory. StorageFraction decision, as a result of the memory footprint of the space outside the heap can be precise calculation, so no need to set the insurance area.Static memory management mechanism is relatively simple to implement, but if the user is not familiar with the Spark storage mechanism, scale and computing tasks or not according to the specific data or do the corresponding configuration, it is easy to cause “half seawater, half flame”, namely the storage memory and execute one remaining in the memory a lot of space, while the other is filled, early Old content has to be eliminated or removed to store new content. This approach is now rarely used by developers due to the new memory management mechanism, and Spark has retained its implementation for the purpose of compatibility with older versions of applications.
Unified Memory Management
The unified memory management mechanism introduced after Spark 1.6 differs from static memory management in that the storage memory and execution memory share the same space and can dynamically occupy each other’s free area. The in-heap memory structure of unified memory management is shown in the following figure:
The most important optimization is the dynamic occupancy mechanism, whose rules are as follows:
- Set the basic storage memory and execute memory area (spark. Storage. StorageFraction parameters), the set to determine the scope of both sides have the space;
- If the space of both parties is insufficient, the storage is saved to the hard disk. If your own space is insufficient and the other side is free, you can borrow the other side’s space; (Insufficient storage is not enough to hold a full Block)
- When the execution memory space is occupied by the other party, the other party can transfer the occupied part to the hard disk, and then “return” the borrowed space;
- After the memory space is occupied by the peer party, the peer party cannot return the memory space. This is complicated because many factors need to be considered during the Shuffle process. The dynamic occupancy mechanism of unified memory management is shown in the figure below:
Spark’s unified memory management mechanism improves the utilization of in-heap and off-heap memory resources to some extent, making it easier for developers to maintain Spark memory, but that doesn’t mean developers can rest easy. Too much storage space or too much cached data can lead to frequent full garbage collection, reducing performance at task execution because cached RDD data usually resides in memory for a long time. Therefore, to make full use of Spark’s performance, developers need to learn more about the management modes and implementation principles of storage memory and execution memory.
In-storage management
RDD persistence mechanism
An elastic distributed data set (RDD) is the most fundamental data abstraction of Spark. It is a set of read-only Partition records and can only be created on data sets in stable physical storage. Or perform a Transformation on another existing RDD to generate a new RDD. The dependence between the converted RDD and the original RDD formed Lineage. With pedigree Spark ensures that every RDD can be restored. However, all transformations of THE RDD are lazy, meaning that only when an Action that returns a result to the Driver occurs does Spark create a task to read the RDD and then actually trigger the execution of the transformation.
When a Task reads a partition when it is started, it checks whether the partition has been persisted. If not, it needs to check Checkpoint or recalculate the partition based on ancestry. So if you want to perform multiple actions on an RDD, you can persist or cache the RDD in memory or disk using the persist or cache methods on the first action to speed up the computation on subsequent actions.
In fact, the cache method persists RDD to memory using the default MEMORY_ONLY storage level, so caching is a special kind of persistence. The design of in-heap and off-heap storage memory allows unified planning and management of the memory used for caching RDD.
The Storage module of Spark is responsible for the persistence of the RDD, which decouple the RDD from the physical Storage. The Storage module manages the data generated during Spark calculation and encapsulates the data access functions in the memory or disk, locally or remotely. Storage modules on the Driver and Executor form a master-slave architecture. That is, the BlockManager on the Driver is Master, and the BlockManager on the Executor is Slave.
The Storage module uses blocks as its logical Storage unit. Each Partition of an RDD corresponds to a unique Block (BlockId is in the format of RDDRDD-idpartition -ID). The Master on the Driver is responsible for managing and maintaining the metadata of blocks in the Spark application. The Slave on the Executor needs to report the update status of blocks to the Master and receive commands from the Master. For example, add or delete an RDD.
Spark specifies MEMORY for RDD persistenceONLY, the MEMORYAND_DISK and other 7 different storage levels, and storage levels are a combination of the following 5 variables:
Class StorageLevel (private var _useDisk:Boolean, private var _useMemory:Boolean, Private var _useOffHeap:Boolean, private var _deserialized:Boolean, Private var _replication:Int=1 // Number of copies)Copy the code
The seven storage levels of Spark are as follows:
Persistence level |
meaning |
---|---|
MEMORY_ONLY |
Persisted in JVM memory as unserialized Java objects. If memory cannot fully store all partitions of the RDD, those that are not persisted will be re-evaluated the next time they need to be used |
MEMORYANDDISK |
The same as above, but when certain partitions cannot be stored in memory, they are persisted to disks. The next time you need to use these partitions, you need to read them from the disk |
MEMORYONLYSER |
Same as MEMORY_ONLY, but using Java serialization, Java objects are serialized and persisted. You can reduce memory overhead, but deserialization is required, which increases CPU overhead |
MEMORYANDDISK_SER |
With the MEMORYANDDISK, but uses serialization to persist Java objects |
DISK_ONLY |
Persisted using unserialized Java objects, stored entirely on disk |
MEMORYONLY2 MEMORYANDDISK_2 , etc. | If the persistence level is 2, the persistent data is reused and saved to other nodes. In this way, data loss does not need to be calculated again and only backup data is used |
Based on the analysis of data structure, it can be seen that the storage level defines the storage mode of Partition (also known as Block) of RDD from three dimensions:
1) Storage location: disk/in-heap memory/out-of-heap memory. MEMORYANDDISK, for example, is stored on both the disk and memory in the heap for redundancy. OFF_HEAP is only stored in off-heap memory. Currently, off-heap memory cannot be stored in other locations at the same time. 2) Storage form: after the Block is cached in the storage memory, whether it is in the form of non-serialization. For example, MEMORYONLY is not serialized and OFFHEAP is serialized. 3) Number of copies: If the number of copies is greater than 1, remote backup to other nodes is required. For example, DISKONLY2 requires a remote backup copy.
RDD caching process
Before an RDD is cached into storage, data in a Partition is usually accessed as an Iterator, which is a method of traversing a data set in Scala. Iterator can retrieve each serialized or unserialized Record in a partition whose object instance logically occupies the other portion of memory in the JVM heap. The storage space of different records on the same Partition is not contiguous.
After the RDD is cached in the storage memory, the Partition is converted to a Block, and the Record occupies a contiguous space in the in-heap or off-heap storage memory. The process of converting a Partition from a discontinuous storage space to a continuous storage space is called “Unroll” by Spark.
Blocks can be serialized or unserialized, depending on the storage level of the RDD. A non-serialized Block is defined as a data structure of DeserializedMemoryEntry, which stores all object instances in an array. A serialized Block is defined as a data structure of SerializedMemoryEntry. Byte buffers are used to store binary data. Each Executor’s Storage module uses a LinkedHashMap to manage all instances of Block objects in and out of the heap. Adding and removing linkedhashmaps to and from the Storage indirectly records the acquisition and release of memory.
Since there is no guarantee that storage space will hold all of the data in Iterator at once, the current computing task needs to apply to the MemoryManager for sufficient Unroll space to temporarily occupy space. If there is insufficient Unroll space, the task fails to Unroll and can continue when there is enough space. For serialized partitions, the required Unroll space can be directly accumulated and applied once.
For a non-serialized Partition, it is required to apply successively in the process of traversing Record. That is, every Record is read, and the Unroll space required by the sample is estimated and applied. When the space is insufficient, it can be interrupted to release the occupied Unroll space.
If the Unroll succeeds, the Unroll space occupied by the current Partition is converted to the normal storage space of the cache RDD, as shown in the following figure.
Elimination and falling disk
Because all computing tasks of the same Executor share limited storage memory space, old blocks in the LinkedHashMap will be Eviction when new blocks need to be cached but the space is insufficient to be used dynamically. If the storage level of a discarded Block contains a requirement to store the Block to a disk, Drop the Block. Otherwise, delete the Block directly. The rules of memory elimination are as follows: the old Block to be eliminated automatically has the same MemoryMode as the new Block, that is, both belong to off-heap or in-heap memory. Lent Old and new blocks cannot belong to the same RDD, avoiding a loop of obsolescence. To prevent consistency problems, the RDD to which an old Block belongs cannot be read. Queue through blocks in the LinkedHashMap, knocking them out in the least recently used (LRU) order until they fill the space needed for the new Block. LRU is a feature of LinkedHashMap. The process of falling disk is relatively simple. If its Storage level meets the condition that useDisk is true, it will judge whether it is in the form of non-serialization according to its deserialized. If so, it will be serialized, and finally store the data to disk and update its information in the Storage module.
Perform memory management
The execution memory is used to store the memory occupied by tasks during Shuffle. Shuffle is a process of repartitioning RDD data according to certain rules. We pay attention to the use of execution memory in the Write and Read phases of Shuffle: buy a Shuffle Write vm
1) If the common sorting method is selected on the Map side, ExternalSorter will be used to externalize the data, which mainly occupies the execution space in the heap when storing data in memory. 2) If the ordering mode of Tungsten is selected on the map side, ShuffleExternalSorter is used to directly sort the data stored in the serialized form, and the out-of-heap or in-heap execution space can be occupied when the data is stored in memory. Depends on whether the user has off-heap memory turned on and whether the off-heap execution memory is sufficient.
Shuffle the Read
1) When aggregating data on the Reduce end, the data should be delivered to the Aggregator for processing, and the execution space in the heap will be occupied when the data is stored in memory. 2) If the final result sorting is required, the data will be handed over to ExternalSorter again, occupying the heap execution space.
In ExternalSorter and Aggregator, Spark uses a hash table called AppendOnlyMap to store data in the heap execution memory. However, all data cannot be stored in the hash table during Shuffle. When the memory occupied by the hash table is periodically sampled and estimated, and the hash table becomes too large to be used for new execution memory from MemoryManager, Spark stores the contents of the hash table to a disk file, a process called Spill. Files that overflow to disk are eventually merged. The Tungsten used in the Shuffle Write phase is a Databricks plan to optimize Spark’s memory and CPU usage (the Tungsten plan), addressing some of the JVM’s performance limitations and drawbacks. Spark automatically selects whether to use Tungsten sorting based on the Shuffle situation.
The page memory management mechanism adopted by Tungsten is based on MemoryManager, that is, Tungsten abstracts the use of execution memory in a single step so that during Shuffle you don’t need to care whether the data is stored in or out of the heap. Each memory page is defined by a MemoryBlock, and Object OBj and long offset are used to uniformly identify the address of a memory page in system memory.
A MemoryBlock in the heap is memory allocated in the form of a long array, whose obj is the object reference to the array, offset is the initial OFFSET address of the LONG array in the JVM, and the combination of the two can locate the absolute address of the array in the heap. A MemoryBlock is a directly requested MemoryBlock, whose obj is null, and whose offset is the 64-bit absolute address of the block in system memory. Spark uses Memoryblocks to encapsulate in-heap and off-heap memory pages in a unified way, and uses pageTable to manage the memory pages applied by each Task. All memory under Tungsten page management is represented by a 64-bit logical address consisting of a page number and an in-page offset:
A page number, which is 13 digits, uniquely identifies a memory page. Spark applies for an idle page number before applying for a memory page. A 51-bit in-page offset, which acts as the offset address of data in a page when a memory page is used to store data. With unified addressing, Spark can locate memory in or out of the heap using Pointers to 64-bit logical addresses. The Shuffle Write sorting process only requires sorting Pointers without deserialization, which is very efficient. The efficiency of memory access and CPU usage has been significantly improved.
Spark’s storage memory and execution memory are managed in different ways. For storage memory, Spark uses a LinkedHashMap to centrally manage all blocks, which are converted into partitions of the RDD that need to be cached. As for execution memory, Spark uses AppendOnlyMap to store data during Shuffle and even abstract page memory management in Tungsten sort, opening up a new JVM memory management mechanism.
Analysis of Spark core components
The BlockManager is a component of Spark responsible for data storage and management. All data of drivers and executors is managed by the corresponding BlockManager.
The Driver has the BlockManagerMaster, which is responsible for maintaining metadata of data internally managed by the BlockManager on each node. For example, metadata changes are maintained for adding, deleting, or modifying blocks.
Each node has a BlockManager. After each BlockManager is created, the first thing to do is to register the BlockManagerMaster, and the BlockManagerMaster will provide the corresponding BlockManagerInfo for its long complex sentence. The operation principle of BlockManager is shown as follows:The relationship between BlockManagerMaster and BlockManager is very similar to the relationship between NameNode and DataNode. The BlockManagerMaster stores the metadata of the internal management data of BlockManager for maintenance. When BlockManager adds, deletes, or modifies blocks, metadata is changed in BlockManagerMaster. NameNode maintains DataNode metadata. When data in DataNode changes, the metadata information in NameNode also changes accordingly.
Each node has a BlockManager, which has three important components: · DiskStore: Reads and writes disk data; · MemoryStore: responsible for reading and writing memory data; · BlockTransferService: responsible for establishing the connection between BlockManager and BlockManager of other remote nodes, and for reading and writing data of BlockManager of other remote nodes;
After each BlockManager is created, the first thing it does is register with the BlockManagerMaster, which creates the corresponding BlockManagerInfo for it.
When BlockManager is used for write operations, for example, some intermediate data in the RDD running process, or if persist() is manually specified, the data will be written to memory first. If the memory size is insufficient, the data will be written to disk using our own algorithm. In addition, if persist() specifies replica, the BlockTransferService will be used to replicate a copy of the data to the BlockManager of another node.
When BlockManager is used for reading operations, for example, shuffleRead, if you can read from the local, use DiskStore or MemoryStore to read data from the local, but there is no local data. Then the BlockTransferService is used to establish a connection with the BlockManager that has data, and then the BlockTransferService is used to read data from the remote BlockManager. For example, in the shuffle Read operation, the data to be pulled may not be available locally. In this case, the node with remote data is searched for the BlockManager of that node to pull the required data.
When adding, deleting, or modifying data using BlockManager, the BlockStatus of a Block must be reported to the BlockManagerMaster. In BlockManagerMaster, you can add, delete, or change the BlockStatus inside the BlockManagerInfo of the specified BlockManager to maintain metadata.
Underlying implementation of Spark shared variables
A very important feature of Spark is shared variables.
By default, if an external variable is used in a function of an operator, the value of the variable is copied to each task, and each task can only operate on its own copy of the variable. If multiple tasks want to share a variable, this approach is not possible.
Spark provides two shared variables, Broadcast Variable and Accumulator. Broadcast Variable is used to copy only one Variable for each node, that is, one copy for each Executor, to optimize performance and reduce network transmission and memory consumption. Accumulator allows multiple tasks to operate on a single variable. Broadcast Variable is a shared read Variable that task cannot modify, and Accumulator allows multiple tasks to manipulate a Variable.
Radio variable
Broadcast variables allow programmers to keep read-only variables of external data on each Executor instead of sending a copy to each task.
Each task keeps a copy of the external variables it uses. When multiple tasks on an Executor use a large external variable, it consumes a lot of Executor memory. Therefore, we can encapsulate the large external variable as a broadcast variable. In this case, one Executor saves a copy of a variable, which is shared by all tasks on the Executor instead of each task. This reduces the memory usage of Spark tasks to some extent.
Spark also tries to use efficient broadcast algorithms to distribute broadcast variables to reduce communication costs.
The Broadcast Variable provided by Spark is read-only and contains only one copy for each Executor rather than one copy for each task. Therefore, the Broadcast Variable reduces the network transmission consumption of variables to nodes and memory consumption on nodes. In addition, Spark uses an efficient broadcast algorithm internally to reduce network consumption.
You can create broadcast variables for each variable by calling the broadcast() method of SparkContext. Then, within the operator’s function, each Executor makes only one copy of the broadcast variable, and each task can use the broadcast variable’s value() method to retrieve the value. While the task is running, Executor does not retrieve broadcast variables. When the task executes code that uses broadcast variables, it requests broadcast variables from Executor memory, as shown in the following figure:Executor then pulls the broadcast variable from the Driver via BlockManager and provides it to Task for use, as shown in the following figure:Broadcast large variables is a basic optimization method commonly used in Spark to improve task execution performance by reducing memory usage.
accumulator
Accumulator: An accumulator is a variable that is only accumulated by related operations and therefore can be effectively supported in parallel. They can be used to implement counters (such as MapReduce) or summation counts.
The Task running on the cluster accumulates the Accumulator value on the Driver. The Accumulator value is then sent to the Driver and is summarized on the Driver. (Spark UI is created when SparkContext is created. Accumulator is an Accumulator that can be read from the node on the Driver.
Spark provides Accumulator for multiple nodes to share a variable. Accumulator only provides the function of accumulation, but provides the function that multiple tasks can operate on the same variable in parallel. However, Task can only accumulate Accumulator and cannot read its value. Only the Driver program can read the value of Accumulator.
Accumulator’s underlying principle is as follows: