Abstract: ### overview ———- This article summarizes some Spark performance tuning methods based on Spark practical experience and Spark principle. These summaries are based on spark-1.0.0. For the recently released Spark-1.1.0 release, this article describes several version enhancements. — — — — — — — — — – # # # Spark performance tuning — — — — — — — — — – # # # # Executor and partition — — — — — — — — — — Executor is a alone

This article summarizes some Spark performance tuning methods based on Spark practice and Spark principle. These summaries are based on spark-1.0.0. For the recently released Spark-1.1.0 release, this article describes several version enhancements.

Spark performance tuning

Executor and partition

An Executor is a separate JVM process, and each task is executed by a separate thread. The maximum number of concurrent tasks an Executor can perform is the same as the number of cores it has. Data cached during execution is stored in the Executor’s global space. Based on the above, we can conclude that:

  1. Tasks executed within the same Executor can share the same data cache. This is also what Spark calls the Process Local level of data localization.
  2. An Executor can execute the same number of concurrent tasks as it has cores.
  3. Concurrent tasks may interfere with each other. For example, some tasks occupy too much memory, causing other concurrent tasks to fail.
  4. Executors must register with and communicate with the Driver. Excessive executors burden the Driver.

When phases are divided into tasks, you get the same number of tasks as the number of partitions. Reducing the number of partitions will reduce the number of tasks and increase the amount of computation to be processed by each task. Considering that the serialization, delivery, runtime preparation, and result collection of the tasks themselves require Driver and Executor resources, reducing the number of tasks can reduce this overhead.

In practice, each Executor can be configured with multiple cores, resulting in a lower number of executors and better data localizations. Based on the configured number of cores and the amount of partition data, you can estimate the minimum memory required by Executor = Number of concurrent tasks x single partition size + Number of memory cache partitions x single partition size. The number of partitions depends on service logic. To make full use of computing resources, see: Number of partitions x Number of Concurrent Jobs >= Number of Executor cores x Number of Executor cores. The number of concurrent jobs refers to the jobs generated when the RDD invokes actions. There is no dependency between the stages of jobs, so they can be executed concurrently.

Parameters of the item The default value Parameter interpretation
spark.executor.instances There is no The number of executors an Application has
spark.executor.cores 1 Number of cores available to a single Executor
spark.executor.memory 512m Maximum memory for a single Executor

The Spark configuration items

Caching and data localizability

RDD’s persist function is used to avoid double-counting in calculations. If you have an RDD that will be called by different jobs, cache it with persist to avoid double-counting the RDD. If the cache is stored in memory, call unpersist after the calculation to release the cache so that the memory cannot be reclaimed.

After the PERSIST of the RDD is invoked, the calculation result of the RDD is written to the cache when the RDD is calculated for the first time. The cache level is determined by the Storage_Level. The cache writing is performed through the BlockManager, and the cache information is synchronized to the BlockManagerMaster. When the RDD is evaluated again, it is first checked for caches in The BlockManagerMaster. If caches are stored in other BlockManagers, they are first transferred to local storage.

The data locality of a task is closely related to the cache location. PreferredLocs are determined when a task is created and then in TaskSetManager. When calculating PreferredLocs, the cache location of the partition in which the Task is located is given priority. If there is no cache, PreferredLocs specified in the RDD are used as the preferred location. Such RDD is generally of the data source type. If none of the above is present, the location of the first partition of the first dependency in the RDD’s narrow dependency takes precedence.

SPARK has four levels of data localizability: PROCESS_LOCAL: same Executor, NODE_LOCAL: same machine, RACK_LOCAL: same rack, and ANY: other. In practice, some partitions are so large that if a Node or Rack level task is created, the partition data in the cache is transferred between executors. This transfer process not only consumes network bandwidth, but also may fill up the memory of the new Executor, resulting in OOM. The net result is long transfer times and possibly Executor crashes. Therefore, it is not recommended to reduce data localizability when the volume of partitioned data is large. If there is a lot of NODE_LOCAL or RACK_LOCAL in the Driver logs, and computing performance degrades, you can try to increase the latency for data localization degradation, or even increase it to PROCESS_LOCAL only.

Parameters of the item The default value Parameter interpretation
spark.locality.wait 3000 (ms) Wait time for data localization degradation
spark.locality.wait.process spark.locality.wait How long do you wait for PROCESS_LOCAL to degrade
spark.locality.wait.node spark.locality.wait How long do you wait for NODE_LOCAL to degrade
spark.locality.wait.rack spark.locality.wait How long does RACK_LOCAL wait to degrade

The Spark configuration items

The RDD PreferredLocations

An alternative RDD implementation is getPreferredLocations(Split: Partition): Seq[String]. This method returns the preferred location of a Partition, usually the location closest to the Partition’s data source. For example, if HDFS is used as the data source, set this parameter to the node where the HDFS file partition resides. When we implement our OWN RDD, it is best to implement this approach if possible to keep the data local for tasks.

TaskSet Scheduling mode

At the TaskSet level, Spark provides TWO scheduling modes: FIFO and FAIR. FIFO mode selects tasksets according to the order of jobs and stages. In FAIR mode, multiple pools can be configured. Each Pool has its own weight and minShare. Weight is the priority of the Pool, and minShare ensures that at least several cores are available. In practice, the FIFO mode will lead to a long waiting time for some jobs in the case of multiple concurrent jobs, while the FAIR mode performs well.

The Spark configuration items

Master mode selection

In the Spark-submit job submission script provided by Spark, if yarn is used as the resource manager, yarn-client and yarn-cluster can be used in Master mode. Yarn-client uses the application as the Driver, and yarn-cluster submits the Driver to a node in the yarn cluster for running. Because there is frequent communication between a Driver and Executor, the location of a Driver is ideally within a cluster. That is, if yarn-client is used, the application must be in a cluster, and yarn-cluster ensures that the Driver is in a cluster.

Some application scenarios require continuous submission of computations on a SparkContext, which may have a very long lifetime. In this case, yarn-cluster mode cannot be used to continuously submit computing tasks, and the interaction between applications and drivers is difficult. In this case, yarn-client mode is applicable.

The Spark configuration items

SparkContext reuse

Some scenarios require a SparkContext to continuously receive computing tasks, which tend to be time-sensitive (in seconds) and may have concurrent computing tasks (such as multi-user submission tasks). In this scenario, the yarn-client mode is applicable. The Driver resides in the application. The application can continuously submit computing tasks to the Driver and process the returned results. The potential risk of this pattern is that both drivers and executors run for a long time, which can lead to memory leaks.

In practice, after an RDD has been cached into memory, calling unpersist does not free the memory immediately, but waits for the garbage collector to reclaim it. In the selection of garbage collector, it is recommended to use CMS garbage collector to avoid garbage collection delays.

You can still get consistent performance with Driver and Executor garbage collection. However, if in some cases computational performance continues to deteriorate over time, you can restart SparkContext to resolve the problem. Because the Driver and Executor will be created entirely after SparkContext is restarted, the original performance will be restored. The restart is done by calling the sparkContext.stop () method in the application after all the current tasks are complete, removing the SparkContext reference, and then creating a new SparkContext.

The Driver needs to upload the Spark Jar package to the cluster to start each Executor. The jar package is about 130M in size. When an Executor receives a task, it transfers the file and Jar package that the task depends on to the local PC. The Jar package in this case is an application package. The Jar package usually contains 100 MB of the application dependencies. In scenarios where computational tasks are time-sensitive, 10 seconds of Jar distribution would be unacceptable. You can solve this problem here by pre-distributing. We first upload the Spark Jar and the application Jar to the same location on each node, such as /root/sparkjar.

Avoid distributing Jar packages when the Driver starts: 1. Set the SPARK_JAR environment variable on the Driver to null to prevent Jar package uploading.

2. In yarn – site. The XML configuration file, set the yarn. The application. The classpath to spark the position of the jar and the default values.

Avoid distributing dependencies and Jar packages when tasks start: 1. Set the path in spark.files and spark.jars to local:/root/sparkjar for Executor to copy from the local PC.

Serialization mode

Serialization is used in Spark for task transmission, task result collection, Shuffle, disk-level cache, and broadcast variable transmission. The rule here is that all operations involving network transport and disk caching are serialized writes and then read and deserialized. Spark provides two serialization methods: JavaSerializer (default) and KryoSerializer. According to Spark, KryoSerializer provides better performance (10 times that of JavaSerializer), but it does not support some Serializable classes. For those classes that are not supported, you need to write your own serialization implementation.

The Spark configuration items

Optimization items of Spark-1.1.0

Sort-based Shuffle

In the Shuffle provided by Spark1.0.0, if there are C cores, M Mapper, and R Reducer, each Mapper will generate R files, a total of M * R files. It is better to Shuffle Files Consolidate, which produces a total of min(C, M) * R Files. Now, assuming that the number of R is large (e.g., 10,000), each Mapper will write input to 10,000 files, which will lead to high DISK I/O, and the data compression and serialization process in the process of writing will also occupy a lot of memory.

Sort-based shuffle enables the output of each Mapper to be written to only one file. The records in this file are sorted, so the Reducer can read according to the start and end range of records. For this reason, sort-based shuffle creates an index file to record the start and end ranges of each partition when writing files.

The Spark configuration items

TorrentBroadcast

Prior to Spark-1.1.0, the broadcast variable factory used HttpBroadcastFactory, where all executors get broadcast variable values from the Driver. Spark-1.1.0 incorporates TorrentBroadcastFactory and sets it as the default, a broadcast variable that works in a similar mode to BitTorrent downloads. The Driver stores variables in data blocks. Executor extracts values based on data blocks and merges them. However, the locations of values are randomly extracted from executors that have data blocks.

Parameters of the item The default value Parameter interpretation
spark.broadcast.factory org.apache.spark.broadcast.TorrentBroadcastFactory How to implement a broadcast variable

The Spark configuration items

Optimization of Task serialization

Spark-1.1.0 has a subtle optimization in task submission that divides tasks into public parts (RDD, dependencies, partition handlers) and non-public parts (StageId, partition ID, priority position). The public part of the task is serialized and transmitted to Executor as a broadcast variable, while the non-public part is serialized and transmitted separately. This avoids multiple transfers of the common parts on the same Executor. See dagScheduler.submitmissingTasks ().

Thank you

Thanks for the optimization suggestions provided by the Data mining and computing team of Taobao Technology Department. Thanks to the ODPS team for their optimization suggestions.