This article is from the OPPO Internet technology team. It is the second article in the Series Analyzing Spark Data partitioning. It focuses on the data partitioning of Spark RDD. This series consists of three articles, and you are welcome to stay tuned.

  • Chapter 1: Analysis of Sharding in Hadoop;

  • The second part mainly analyzes the partitioning of Spark RDD.

  • Part 3: Analysis of Spark Streaming and data partition in TiSpark;

Please note the author and follow the official account of OPPO Internet technology team: OPPO_tech to share the cutting-edge Internet technology and activities of OPPO.

Spark

The following uses Spark on Yarn as an example to describe the working principle of Spark.

Task Operation Procedure

1. The client submits the Application to RM. RM checks whether the cluster resources meet the requirements.

2.RM selects a NodeManager in the cluster to start the Application Master (Cluster mode).

3. The Driver starts processes on the NodeManager node where the AM resides.

4.AM applies for resources from ResourceManager and starts executors on each NodeManager.

5. The Driver starts to schedule tasks and forms an RDD blood diagram (DAG diagram) through Transaction operations. Finally, the Driver invokes actions to trigger jobs and schedule execution.

6.DAGScheduler is responsible for stage-level scheduling.DAG is divided into several Stages and each Stage is packaged into Taskset and sent to TaskScheduler for scheduling.

7. The TaskScheduler is responsible for task-level scheduling. The DAGScheduler dispatches tasksets to executors according to the specified scheduling policy.

Spark RDD

RDD elastic distributed data set, RDD contains five characteristics

1.Compute:

RDD is based on partition in task calculation. The data of each fragment is obtained by Compute. Different subclasses of RDD can implement their own Compute method.

2.getPartitions:

The RDD is a set of partitions. An RDD has one or more partitions. The number of partitions determines the parallelism of Spark tasks.

3.getDependencies:

Each RDD has a dependency (the dependency of the source RDD is empty). The dependency is called lineage.

4.getPreferredLocations:

For the dependency lists of other RDD, Spark tries to allocate tasks to the machine where data resides during task scheduling, avoiding data transfer between machines. The method for obtaining the priority locations of RDD is getPreferredLocations. It usually takes precedence only when it involves reading data from external storage structures, such as HadoopRDD, ShuffleRDD;

5. Partitioner:

Determines which Partition the data is allocated to. For a non-key-value RDD, the Partitioner is None, corresponding to a key-value RDD, and the default Partitioner is HashPartitioner. During shuffle operations, such as reduceByKey, sortByKey, and Partitioner determine how to map data in the partition corresponding to the output of the parent RDD shuffle.

The first two functions are required by all RDD’s, the last three are optional, and all RDD’s inherit this interface.

Spark Partition

Due to the large amount of data in RDD, RDD needs to be partitioned and stored in the partitions of each node for the convenience of calculation. Therefore, when we perform various calculation operations on RDD, the data in each partition will be operated in parallel.

In other words, a piece of original data to be processed will be divided into multiple parts according to the corresponding logic. Each piece of data corresponds to a Partition of THE RDD. The number of partitions determines the number of tasks and affects the parallelism of the program. That is, each RDD has its own Partition implementation.

HadoopRDD

Spark often needs to read files from the HDFS to generate the RDD for calculation and analysis. This RDD generated by reading files from HDFS is called HadoopRDD.

HadoopRDD rewrites three methods of the RDD interface:

  1. override def getPartitions: Array[Partition]
  2. override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)]
  3. override def getPreferredLocations(split:Partition): Seq[String]

The logic that determines the number of splits is actually called inputFormat.getSplits in getPartitions.

InputFormat is an interface: org. Apache. Hadoop. Mapred. InputFormat, including getInputSplit is figure – 7 shows.

According to the source code analysis, in the case of HadoopRDD, the partition number of RDD has been determined before the GENERATION of RDD, which is determined by the parameters of HADOOP. We can adjust it by:

spark.hadoop.mapreduce.input.fileinputformat.split.minsize; spark.hadoop.mapreduce.input.fileinputformat.split.maxsize;

To adjust the number of partitions in HadoopRDD.

Partitions in Spark SQL

Spark SQL finally transforms SQL statements into physical operator trees through logical operator trees.

In the physical operator tree, SparkPlan nodes of leaf type are responsible for creating RDD from scratch. Each SparkPlan node of non-leaf type is equivalent to a Transformation on RDD, that is, to transform into a new RDD by calling execute () function. The collect() operation triggers calculation and returns the result to the user.

Focus on the analysis of leaf nodes:

In Spark SQL, SparkPlan of type LeafExecNode is responsible for the creation of the initial RDD.

HiveTableScanExec Generates HadoopRDD based on HDFS information stored in Hive data tables. FileSourceScanExec generates FileScanRDD based on the source file where the data table resides.

When the Hive, speaking, reading and writing in metastore Parquet table file, transformation way through spark. SQL. Hive. ConvertMetastoreParquet control.

Default true, if set to true

To use:

Org. Apache. Spark. SQL. Execution. FileSourceScanExec,

Otherwise:

org.apache.spark.sql.hive.execution.HiveTableScanExec

FileSourceScanExec currently includes creating bucket RDD and non-bucket RDD. Either way, the result is a FileRDD.

The RDD for creating a non-bucket table is analyzed below

FileRDD getPartition method:

override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray
Copy the code

To get maxSplitBytes, the determinants are the following three parameters:

Conclusion:

If you want the maxSplitBytes value to be larger, the number of partitions to be smaller.

By increasing the value of defaultMaxSplitBytes,

Is the spark. SQL. Files maxPartitionBytes,

Will spark. SQL) files) openCostInBytes also large;

If you want to make the maxSplitBytes value smaller, that is, the number of partitions larger.

DefaultMaxSplitBytes can be set to a smaller value,

Is the spark. SQL. Files maxPartitionBytes,

Will spark. SQL) files) openCostInBytes down, too.

The RDD of FileSourceScanExec for creating bucket tables is analyzed below.

Through source code analysis, bucket table partition number with the number of buckets is a one-to-one relationship.

HiveTableScanExec

HiveTableScanExec Generates HadoopRDD based on HDFS information stored in Hive data tables.

Generally, HiveTableScanExec partitions files by number and size.

Such as:

Read a data copy of 2048 MB size. Set the HDFS block size to 128 MB

  1. The directory has 1000 small files

Answer: 1000 partitions are generated.

  1. If I only have one file,

Answer: 16 partitions are generated

  1. If there is one large file 1024M, the remaining 999 files total 1024M

Answer: 1007 partitions will be generated.

For tuning of the HiveTableScanExec type, see HadoopRDD.

RDD transformation

What is the number of partitions generated when one RDD is transformed to another RDD by Transformation?

  1. filter(), map(), flatMap(), distinct()

The number of partitions is equal to the number of parent RDD.

  1. rdd.union(other_rdd)

The number of partitions is rDD_size + other_rDD_size

  1. rdd.intersection(other_rdd)

Number of partitions = Max (rDD_size, other_rDD_size)

  1. rdd.subtract(other_rdd)

The number of partitions is rdd_size

  1. rdd.cartesian(other_rdd)

The number of partitions is rdd_size x other_rDD_size

RDD coalesce and repartition

Sometimes need to set the number of partitions of RDD, such as partition, RDD RDD partition is more, but the relatively small number of RDD, partition number can increase task parallelism, but is likely to cause each partition of the data quantity is less, the partition data quantity is too little causes of inter-node communication time throughout the task execution time than be amplified, Therefore, you need to set up a reasonable partition.

Two methods can be used to reconfigure RDD partitions: coalesce() and repartition().

Repartition is a special case in which shuffle is true in coalesce.

In distributed computing, each node only calculates part of the data, that is, only processes one fragment. Therefore, to obtain all the data corresponding to a key, such as reduceByKey and groupByKey, it is necessary to pull the data of the same key to the same partition, and the data of the original partition needs to be scrambled and reorganized. The process of repartitioning data according to certain rules is called Shuffle.

Shuffle is a bridge between Map and Reduce. It describes the process of transferring data from Map to Reduce.

When increasing parallelism, extra shuffle is advantageous. For example, if some files in the data are indivisible, then the partition corresponding to the large file will have a large number of records, rather than spreading the data among as many partitions as possible to use all the cpus that have been requested. In this case, using Reparition to re-generate more partitions to meet the required degree of parallelism for the later conversion operator provides a significant performance boost.

Analysis coalesce function source code

Shuffle = true

Data Hash is performed based on the number of new partitions. The output partition is randomly selected and the data of the partition is entered

Output to the output partition.

Shuffle = false

Analyze the getPartitions method of CoalescedRDD source code

PartitionCoalescer is used to:

  1. Ensure that each CoalescedRDD partition has the same number of Parent RDD partitions.

  2. Each partition of the CoalescedRDD is as local as possible to its Parent RDD. For example, partition 1 of CoalescedRDD corresponds to partition 1 to 10 of its Parent RDD, but partition 1 to 7 are on node 1.1.1.1. In this case, 1.1.1.1 is used for partition 1 of CoalescedRDD. The purpose of this is to reduce data communication between nodes and improve processing capacity;

  3. The CoalescedRDD partition should be allocated to different nodes. Specific implementation DefaultPartitionCoalescer class for reference.

The following uses an example to analyze Repartition and Coalesce.

Assume that the source RDD has N partitions and needs to be redivided into M partitions

Repartition implementation:

If N < M

Generally, N partitions have uneven data distribution. Use the HashPartitioner function to repartition data into M partitions. In this case, set shuffle to True.

Coalesce implementation:

If N > M

  1. If N is 1000 and M is 100, shuffle can be set to false. If N is 1000 and M is 100, shuffle can be set to false. If N is 1000, shuffle can be set to false.

  2. If M is greater than N, coalesce is invalid and shuffle is not performed, and the partiton between parent RDD and child RDD cannot increase. If shuffle is false, the number of partitions in the RDD will remain the same if the parameter passed is greater than the number of existing partitions. In other words, the number of partitions in the RDD cannot be increased without shuffle.

  3. If N is greater than M and the difference between the number of executors and the partition to be generated is large, the coalesce efficiency is high. Conversely, using coalesce will result in (executor number – Partiton number to be generated) empty excutor runs, reducing efficiency.

conclusion

Spark is one of the most widely used big data computing engines and is widely used in data analysts. It is known for its fast processing speed. Through the above analysis, we can use reasonable computing resources, including CPU, memory, and executor, to execute computing tasks, making our cluster more efficient. More task outputs can be obtained in the same computing resource scenario.