What is RDD?

Resilient Distributed Dataset (RDD) Resilient Distributed Dataset (RDD) is the most basic data abstraction in Spark. It represents an immutable and partitioned collection whose elements can be computed in parallel.

Spark’s lazy mechanism

RDD divides operations into two categories: Transformation and Action. No matter how many transformations are performed, the RDD does not actually perform the operation. The operation is triggered only when the Action operation is executed.
Because of lazy execution, Spark can run in memory more efficiently. The efficient shared memory mechanism avoids a large number of intermediate results, thus avoiding the performance cost caused by disk writes. In addition, internal storage objects can be JAVA objects, avoiding unnecessary serialization and deserialization.

Wide and narrow dependencies

Narrow dependency: one Partition of each parent RDD is used by at most one Partition of the RDD. Operations such as map, filter, and union generate narrow dependency. (No shuffle is generated)
Narrow dependence can be divided into two cases:
  • A partition of a child RDD corresponds to a partition of a parent RDD, such as map, filter, and Union operators
  • A partition with one child RDD corresponds to N partitions with parent RDD, for example, co-parjoin

Wide dependency: A parent RDD Partition can be used by multiple sub-RDD partitions, such as groupByKey, reduceByKey, and sortByKey. (Shuffle)
The wide dependence can be divided into two cases:
  • One parent RDD corresponds to multiple sub-RDD partitions, such as groupByKey, reduceByKey, sortByKey and other operators
  • One parent RDD corresponds to all child RDD partitions, such as joins without collaborative partitioning

RDD operation principle

  1. After the RDD object is created, the DAG graph is constructed according to the RDD dependencies, and the DAG graph is submitted to DAGScheduler for parsing.
  2. DAGScheduler is a high-level scheduler oriented to scheduling stages. DAGScheduler divides DAG into interdependent scheduling stages. The split scheduling stage is based on whether the RDD dependency is wide dependent. Each scheduling phase contains one or more tasks, which form a task set and are submitted to the underlying TaskScheduler for scheduling. In addition, DAGScheduler also records materialized actions such as which RDD is stored on disk, and seeks the optimal scheduling optimization of tasks, such as data local type. DAGScheduler also monitors the running scheduling process, and if a scheduling phase fails to run, it needs to be resubmitted.
  3. Each TaskScheduler serves only one SparkContext instance. TaskScheduler receives a collection of tasks sent from DAGScheduler. The TaskScheduler receives a task set and runs it on the Worker node one method at a time. If a task fails, the TaskScheduler is responsible for retrying it. If the TaskScheduler finds that a task has never completed, it may start the same task to run the same task, and the result of which task is completed first is used.
  4. Executors in Worke receive tasks from the TaskScheduler and run in multithreaded fashion, with each thread responsible for one task. TaskScheduler returns a result to the TaskScheduler after the task is completed. Different tasks return different results. ShuffleMapTask returns a MapStatus object, not the result itself; ResultTask Returns different methods according to the size of the result.

DAG

In Spark, the DAG graph (directed acyclic graph) is divided into different stages according to the dependency relationship between RDD. For narrow dependency, partition conversion can be completed in the same thread due to the deterministic partition dependency relationship, and narrow dependency is divided into the same stage by Spark. For wide dependency, the next stage can only start the following calculation after the shuffle processing of the parent RDD is completed.

Therefore, the overall idea of Spark dividing stages is as follows: Start from back to front, break off when wide dependency occurs, and divide into a stage. When a narrow dependency is encountered, the RDD is added to the stage. Therefore, in Figure 2,RDD C,RDD D,RDD E, AND RDD F are built in A stage,RDD A is built in A separate stage, and RDD B and RDD G are built in the same stage.
This distribution method enables different pipeline operations for different partitions, which is conducive to efficient operation and fault tolerance mechanism. Imagine that when C1- >D1 is finished, D1- >F1 can be directly run. In this way, parallel computing can effectively improve performance. At the same time, when errors occur, good stage division also reduces the cost of recalculation.

Partition

Partition is an important concept in Spark. It is the smallest unit of an RDD. An RDD consists of partitions distributed on each node. The number of partitions determines the number of tasks, and each task corresponds to one partition.
For example, Spark is used to read local text files. After reading, the contents are divided into multiple partitions, which form an RDD and can be executed on different machines.
Impact of Partition number:
  • If the number of partitions is too small, computing resources cannot be fully utilized. For example, if eight cores are allocated but the number of partitions is 4, half of the cores will not be utilized.
  • If the number of partitions is too large, computing resources can be fully utilized. However, a large number of tasks may affect the execution efficiency, mainly because task serialization and network transmission cost a lot of time.

RDD persistence

An important feature of Spark is the ability to persist RDD in memory. When an RDD is persisted, each node persists the partition of the RDD it operates to the memory and uses the partition of the memory cache in the repeated use of the RDD. In this way, in the scenario where multiple operations are performed repeatedly on an RDD, the RDD can be calculated only once, and the RDD can be directly used later, rather than repeatedly calculated.
  • To persist an RDD, simply call its cache() or persist() methods. When the RDD is first calculated, it is cached directly on each node. In addition, Spark’s persistence mechanism is automatically fault-tolerant. If any partition in the persistent RDD is lost, Spark automatically uses the source RDD to run transformation to recalculate the partition.
  • The difference between cache() and persist() is that cache() is a simplified form of persist(), and the underlying layer of cache() is a no-parameter version of the call to persist(MEMORY_ONLY). If you need to clear the cache from memory, you can use the unpersist() method.
  • Spark also implements data persistence during shuffle operations, such as writing data to disks, to avoid recalculating the entire process when nodes fail.

RDD Persistence policy

RDD persistence can be manually selected for different policies. For example, RDD can be persisted in memory, persisted to disk, persisted in serialized mode, and multiplexed with multi-persistent data. Simply pass in the corresponding StorageLevel when you call persist().

Memory is managed using the LRU reclaim algorithm. When a new RDD partition is computed, but there is not enough space to store it, the system reclaims one of its partitions from the least recently used RDD.
Unless the RDD is the RDD of the new partition, Spark keeps the old partition in the memory to prevent the partition of the same RDD from being rotated in and out.
How do I choose an RDD persistence strategy?
  1. By default, the highest performance is of course MEMORY_ONLY, but only if the memory is large enough to hold all the data for the entire RDD. Because no serialization or deserialization is performed, this part of the performance overhead is avoided. Subsequent operator operations on this RDD are based on pure in-memory data, do not need to read data from disk files, and have high performance. And there is no need to make a copy of the data and remotely transfer it to another node. However, it is important to note that in a real production environment, there are only a limited number of scenarios where this strategy can be used directly. If there is a large amount of data in the RDD (say, billions), using this persistence level directly will cause the JVM to run out of OOM memory exceptions.
  2. If a memory overflow occurs when using the MEMORY_ONLY level, it is recommended to try the MEMORY_ONLY_SER level. This level serializes RDD data and stores it in memory, where each partition is just a byte array, greatly reducing the number of objects and memory footprint. This level of performance overhead over MEMORY_ONLY is mainly the overhead of serialization and deserialization. However, subsequent operators can operate based on pure memory, so overall performance is relatively high. If the amount of data in the RDD is too large, the OOM memory may overflow.
  3. If no level of pure memory is available, it is recommended to use the MEMORY_AND_DISK_SER policy rather than the MEMORY_AND_DISK policy. Because at this point, it means that the RDD is very large and the memory cannot be completely laid down. Less data is serialized, saving memory and disk space. In addition, data is first cached in the memory. Data is written to disks only when the memory cache is insufficient.
  4. DISK_ONLY and the suffix _2 level are generally not recommended, because reading and writing data solely based on disk files can lead to a dramatic performance degradation, and sometimes it is better to recalculate all RDD’s. For the level whose suffix is _2, all data must be replicated and sent to other nodes. Data replication and network transmission incur high performance costs, and are not recommended unless high availability of jobs is required.

Checkpoint mechanism

The RDD uses the cache mechanism to read data from the memory. If no data is read, the RDD uses the checkpoint mechanism to read data. In this case, if there is no checkpoint mechanism, the parent RDD needs to be recalculated, so checkpoint is an important fault tolerance mechanism.
Val data = sc.textfile ("/ TMP /spark/1.data").cache() data.checkpoint data.countCopy the code

Write process:

The following states are passed during the RDD checkpoint:
[Initialized — > marked for checkpointing — > checkpointing in progress — > checkpointing]
  • The driver program uses RDD.checkpoint () to check which RDD needs to be checked. After checking, the RDD is managed by RDDCheckpointData. You also need to set the checkpoint storage path, which is usually on the HDFS.
  • Marked for checkpointing: After initialization, RDDCheckpointData will mark RDD as MarkedForCheckpoint.
  • Checkpointing in progress: Finalrdd.docheckpoint () is called after each job runs. FinalRdd scans back up the Computing chain. Mark an RDD to checkpoint as CheckpointingInProgress, Then broadcast the configuration files (such as core-site. XML) required for writing disks (such as HDFS) to the blockManager on other worker nodes. Is complete, start a job to do: (use RDD. Context. RunJob (RDD, CheckpointRDD writeToFile (path. ToString, broadcastedConf))).
  • Checkconservative: Job After checkpoint, remove all RDD dependency and set the RDD status to checkconservative. Then, impose a dependency on the RDD and set the parent RDD of the RDD to CheckpointRDD, which is responsible for reading checkpoint files on the file system and generating partitions of the RDD.

Checkpoint reading process

  • If an RDD is checked, partitions and dependencies are handled using the RDD’s checkpointRDD variable, which is ReliableCheckpointRDD. This is created in the Checkpoint write process. ReliableCheckpointRDD: ReliableCheckpointRDD: ReliableCheckpointRDD: ReliableCheckpointRDD
  • If not, go back and rely on it. Dependency means that there is no dependency because the dependency has been cut off. To obtain partition data, check checkpoint files saved from different partitions in the HDFS directory.
RDD requires checkpoints in two cases.
  1. Lineage is too long in DAG and too expensive to recalculate (as in PageRank).
  2. The benefits of Checkpoint on broad dependencies are even greater.