1.RDD cache mechanism cache, persist

One reason Spark is so fast is that RDD supports caching. After a successful cache, the data set is fetched directly from the cache if it is used in subsequent operations. There is a risk of cache loss, but because of the dependencies between RDD’s, if cached data is lost for a partition, you only need to recalculate that partition.

The operators involved are: persist, cache, and unpersist. Is the Transformation

Cache is to write the calculation results to different media. User-defined and definable storage levels (storage levels define the storage media of cache. Currently, memory, out-of-heap memory and disk are supported).

Using caching, Spark avoids double calculation on the RDD and greatly improves calculation speed. RDD persistence, or caching, is one of Spark’s most important features. Caching is a key ingredient in Spark’s ability to build iterative algorithms and fast interactive queries.

One of the reasons Spark is so fast is to persist (or cache) a data set in memory. When an RDD is persisted, each node will

Storing the results of the calculated sharding in memory and re-using it in other actions on the dataset (or derived dataset). This makes subsequent actions more rapid; Mark an RDD for persistence using the persist() method. Mark it for persistence because places that persist() statements do not run the RDD and persist it, but do not persist it until the first action triggers the actual calculation. The persist() or cache() methods mark an RDD to be persisted, persistence is triggered, and the RDD is retained in the compute node’s memory for reuse.

There is a tradeoff between space and speed when to cache data. In general, if multiple actions require an RDD and it is expensive to compute, you should cache the RDD.

The cache may be lost, or data stored in memory may be deleted due to insufficient memory. The RDD cache’s fault tolerance ensures that calculations are executed correctly even if the cache is lost. The lost data is recalculated through a series of RDD-based transformations. Each Partition of an RDD is independent. Therefore, only the lost Partition needs to be calculated, not all partitions need to be recalculated.

Two parameters are required to start off-heap memory:

  • Spark. Memory. OffHeap. Enabled: whether open outside the heap memory, the default value is false, you need to set to true;
  • Spark. Memory. OffHeap. Size: the size of the heap memory space, the default value is 0, you need to set up positive.

1.1 Cache Level

One reason Spark is so fast is that RDD supports caching. After a successful cache, the data set is fetched directly from the cache if it is used in subsequent operations. There is a risk of cache loss, but because of the dependencies between RDD’s, if cached data is lost for a partition, you only need to recalculate that partition.

Spark supports multiple cache levels:

Storage Level Meaning (Meaning)
MEMORY_ONLY The default cache level stores the RDD in the JVM as a deserialized Java object. If the memory space is insufficient, some partitioned data will no longer be cached.
MEMORY_AND_DISK Store the RDD in the JVM as a deserialized Java object. If memory space is insufficient, uncached partitioned data is stored to disk and read from disk when the partitions are needed.
MEMORY_ONLY_SER The RDD is stored as serialized Java objects (each partition as a byte array). This method saves more storage space than deserializing objects, but increases the CPU’s computational burden when reading them. Only Java and Scala are supported.
MEMORY_AND_DISK_SER Similar to theMEMORY_ONLY_SER, but overflow partition data is stored to disk instead of being recalculated when it is used. Only Java and Scala are supported.
DISK_ONLY Cache the RDD only on disk
MEMORY_ONLY_2.MEMORY_AND_DISK_2 The function is the same as the corresponding level above, but copies are made on two nodes in the cluster for each partition.
OFF_HEAP withMEMORY_ONLY_SERSimilar, but stores the data in off-heap memory. This requires enabling off-heap memory.

Two parameters are required to start off-heap memory:

  • Spark. Memory. OffHeap. Enabled: whether open outside the heap memory, the default value is false, you need to set to true;
  • Spark. Memory. OffHeap. Size: the size of the heap memory space, the default value is 0, you need to set up positive.

1.2 Using Cache

There are two methods for caching data: persist and cache. Cache also calls persist internally, which is a specialized form of persist and is equivalent to persist(storagelevel.memory_only). The following is an example:

// All storage levels are defined in the StorageLevel object
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()
Copy the code

The cached RDD has a green dot in the DAG diagram.

1.3 Removing cache

Spark automatically monitors cache usage on each node and deletes old data partitions according to least-recently used (LRU) rules. Of course, you can do this manually using the rdd.unpersist () method.

2. Checkpoint of RDD fault tolerance

2.1 Operators involved: checkpoint; Is also the Transformation

Spark also provides a checkpoint mechanism for data storage in addition to persistence operations. Checkpoints are essentially written to highly reliable disks by RDD, primarily for fault tolerance. Checkpoints are implemented by writing data to the HDFS file system

Checkpoint functionality of RDD. Too long Lineage will cause too high fault tolerance cost, so it is better to perform checkpoint fault tolerance in the intermediate stage. If some nodes fail later and partitions are lost, the fault can be removed from

Checkpoint RDD starts to redo Lineage, reducing overhead.

2.2 Differences between Cache and Checkpoint

The cache calculates the RDD and stores it in memory. However, the dependency chain of an RDD cannot be discarded. When an executor fails at a certain point, the RDD in the cache is discarded and the dependency chain is used to replay the calculation. The difference is, checkpoint is

The RDD is stored in the HDFS and is a reliable storage of multiple copies. In this case, the dependency chain can be discarded. Therefore, the dependency chain is severed.

2.3 Checkpoint Applies to Scenarios

The following scenarios are suitable for using the checkpoint mechanism:

  1. Lineage in DAG is too long. If Lineage is recalculated, the cost is too high

  2. The benefits of Checkpoint on broad dependencies are even greater

Like cache, checkpoint is lazy.

val rdd1 = sc.parallelize(1 to 100000)
// Set the checkpoint directory

sc.setCheckpointDir("/tmp/checkpoint")

val rdd2 = rdd1.map(_*2)

rdd2.checkpoint

// checkpoint is lazy

rdd2.isCheckpointed

// RDD dependencies before checkpoint

rdd2.dependencies(0).rdd

rdd2.dependencies(0).rdd.collect

// Perform an action to trigger the checkpoint execution

rdd2.count

rdd2.isCheckpointed

// Check the RDD dependencies again. After checkpoint, lineage of RDD was truncated and started from checkpointRDD

rdd2.dependencies(0).rdd

rdd2.dependencies(0).rdd.collect

// Check the checkpoint file that RDD depends on

rdd2.getCheckpointFile 
Copy the code

Note: Files at checkpoint are not deleted after they are executed.