The authors introduce

Zhou Ming is an algorithm development engineer at Qunar.com. I joined Qunar in 2018, mainly engaged in recommendation algorithm related work.


What can this article bring you

  • Not familiar with Spark: You can quickly have a simple and clear understanding of Spark, and know what Spark can be used for. For those who often deal with big data, you can think about how to use it in your own work.
  • Start writing Spark: Review the basic concepts and principles of Spark, avoid common pitfalls for beginners, and develop awareness of performance tuning.
  • Old Spark driver: Let’s review Spark in twos and threes.

1. The origin of Spark

Taking a few minutes to talk about the history of big data technology can help us understand the historical context of Spark’s birth.

The big data technology we talk about today actually originated from three papers published by Google around 2004, the so-called “troika”, Distributed file system GFS, big data distributed computing framework MapReduce, and NoSQL database system BigTable. In 2006, Doug Cutting, the founder of Lucene open source project, developed Hadoop based on the principle of his thesis, mainly including Hadoop distributed file system HDFS and big data computing engine MapReduce. Hadoop caused a sensation once it was released. Well-known Internet companies such as Yahoo, Baidu and Alibaba are increasingly using Hadoop for big data storage and computing.

Early on, big data programming with MapReduce was complicated, so Yahoo engineers developed Pig, a scripting language that used an SQL-like syntax. Pig scripts were compiled to generate MapReduce programs that ran on Hadoop. However, it is an SQL-like syntax, and it is still very expensive for developers to learn the migration of a large number of SQL data. Therefore, Facebook has released Hive, which supports big data computing using SQL syntax. Hive converts SQL statements into Map and Reduce computing programs.

In the early days of Hadoop, MapReduce was both an execution engine and a resource scheduling framework. MapReduce was difficult to maintain and had bloated modules. As a result, Yarn, a resource scheduling system, was born in 2012. In the same year, Spark, a memory-based computing tool developed by UC Berkeley’s AMP lab, came to prominence. Because MapReduce requires frequent I/O disk operations to perform complex big data calculations, the execution efficiency of MapReduce is very slow. Moreover, at that time, memory has already exceeded capacity and cost limits. Therefore, Spark based on memory computing is immediately popular in the industry. And gradually replace MapReduce in enterprise applications.

The business scenarios that these computing frameworks deal with are all called batch computing. In the big data world, there is another class of scenarios that require real-time computation of large amounts of data generated in real time. In 2014, three streaming computing frameworks Storm (Millimeter-scale computing Response, But throughput is low), Spark Streaming (second response, but high throughput), and Flink (millimeter computational response, but high throughput) became Apache’s top projects, ushering in the era of Streaming computing.


What is MapReduce?

As we saw in the last video, Spark was born with a mission to be faster, higher and more powerful than MapReduce, so it’s important to understand what MapReduce is.

To illustrate how MapReduce works, start with potato chips: Three potatoes are sent to the assembly line as raw materials. The first step on the assembly line is cleaning. The second process is to slice the potatoes. After the slicing operation, they become thin potato chips of different sizes. The third process is baking. So far, we can regard the above three different processes as different operators in the Map stage, through which potato chips with different sizes are produced. Then the distribution process (Reduce) is carried out. The distribution operation firstly separates potato chips of different sizes, and then distributes them to the designated assembly line for bucket loading. The whole process is a MapReduce.


3. Historical limitations of MapReduce

Although MapReduce was a big deal when it was first introduced, it had the following drawbacks due to its historical limitations:

  1. Only Map and Reduce operations are supported.
  2. In complex application scenarios, there were a lot of disk I/O operations (memory was expensive at the time) and sort merge operations, which were extremely inefficient. We know that a logically complex calculation translates into multiple MapReduce jobs, and each MapReduce job reads and writes to the disk repeatedly.
  3. Not suitable for iterative computing (such as machine learning, graph computing, etc.), interactive processing (data mining), and stream computing (click log analysis).
  4. MapReduce programming is not flexible enough.

Thus, Spark is left with the following questions:

  1. Can multiple expressive operators be supported?
  2. Can repeated disk reads and writes and sort merges be avoided?
  3. Can you provide one-stop support?
  4. Can you move away from complex MapReduce programming and support multiple programming language apis?

4. Why Spark?

In the long history, time chose Spark. Let’s see how it solves historical problems, and what are its main advantages?

The first is speed. Speed is the foundation of Spark. As big data application scenarios become more complex, people need a faster big data framework. Spark’s official website tests show that Spark runs 100 times faster than Hadoop, mainly because Spark’s RDD and DAG based memory calculation reduces repeated I/O operations. At the same time, the shuffle mechanism is constantly optimized to improve the sorting combination in shuffle process, and Spark SQL can be automatically optimized.

MapReduce supports only Map and Reduce operations. Spark has a rich operator library to meet big data development requirements and supports apis in various languages, including Java, Scala, Python, R, and SQL. Further expanding Spark’s influence;

Spark SQL, Spark Streaming, Spark MLib, Spark GraphX, Spark MLib, Spark MLib. So Spark has big ambitions;

Finally, Runs Everywhere enables simple migration from Hadoop to Spark, Hadoop YARN, and multiple data sources such as HDFS, HBase, and Hive are supported.

Perhaps at this point, you, who are good at summarizing, will come to a conclusion: Spark is faster than MapReduce because memory-based Spark is better than MapReduce, which requires a lot of I/O disk operations. This is a common misconception when people first get to know Spark. Memory computing is not unique to Spark. Because both Spark and MapReduce computations take place in memory, the difference between them is that in complex computations requiring multiple MapReduce operations, Spark does not need to write intermediate results to disk, thereby reducing I/O operations. More importantly, How does Spark do this? This is mainly due to Spark’s DAG and RDD. DAG records the stages of a Job and the dependencies between parent AND child RDD during Job execution (these concepts are not familiar, but will be introduced later). Based on this architecture design, Intermediate results can be stored in memory as RDD and recovered from DAG, greatly reducing disk I/O. In the historical environment proposed by MapReduce, memory was more expensive than disk. Therefore, Spark was created by The Times to some extent.


5. Basic Concepts of Spark

Let’s start by introducing some basic Concepts of Spark:

  • RDD refers to an elastic distributed data set that comes from the same source, has the same structure, and has the same purpose when Spark performs distributed computing.
  • DAG is a directed acyclic graph of RDD. DAG records the stages of a Job and the dependencies between parent RDD and child RDD during Job execution.
  • Job: In an Application, the boundary of a Job is defined by Action. A Job contains N Transformation operators and one Action operator.
  • Stage: The computing task submitted by the user is essentially a DAG composed of RDD. If the RDD needs to perform Shuffle(wide dependency) during transformation, the Shuffle process will divide the DAG into different stages. Due to Shuffle, different stages cannot be computed in parallel, because the calculation of later stages requires the result of Shuffle of earlier stages.
  • Task, a computation Task that performs serial operations on an RDD within a Stage. Each Stage consists of a set of concurrent tasks that execute exactly the same logic but operate on different partitions. The total number of tasks in a Stage is determined by the Partition number of the last RDD in the Stage.

Spark submission process

With some basic concepts behind it, let’s take a look at what steps the system performs when we submit a Spark task.

Each Spark task starts a Driver process. The Driver process startup mode varies with the Spark task submission type. The main differences are as follows:

  1. Local: The Driver process runs locally
  2. Yarn-client: the Driver runs locally
  3. Yarn-cluster: the Driver runs in a cluster (NodeManager)

Second, the Driver process requests resources for the Spark task: Apply to the cluster Manager Resource Manager for the resources required to run the Spark job. The main resources are the Executor process and CPU core required. The number of processes and CPU cores can be specified based on the Resource parameters set in the Spark job.

Third, the Driver process splits the Spark task code into stages based on a wide dependency operator.

Fourth, the Driver process creates a batch of tasks for each Stage.

Finally, these tasks are divided into Executor processes for execution.


7. Spark Web UI

You can use Spark’s Web UI to view the progress of Spark tasks, DAG charts, Job division, Stage division, Executor and Task execution. The Web UI is also a starting point and an important monitoring tool for performance tuning.


Spark performance optimization

In actual Spark development, we inevitably encounter the problems shown in the figure. Most of these problems are caused by the large amount of data or the poor performance of the written Spark code, resulting in various memory overflow and stack overflow. Here are some ideas for optimizing Spark performance.

Idea 1: Avoid data redundancy at its root from a practical business and code specification perspectiveHere are a few examples of reducing data redundancy:

  1. If Spark SQL is used to read data from HDFS and perform data calculation, check where conditions based on actual service scenarios to minimize invalid data. For example, exclude international data when performing domestic services.
  2. Check whether the join key of the right table is unique during the left join to avoid data explosion after join.
  3. When you need to perform multiple operator operations on an RDD, use the RDD persistence strategy and reuse the same RDD.

The following figure shows several persistence strategies for RDD.

So how do you choose a persistence strategy?

  1. The highest performance is MEMORY_ONLY, but only if your memory is large enough to run out
  2. If a memory overflow occurs, using the MEMORYONLYSER level, serialization reduces memory footprint and reduces frequent GC, but increases the overhead of serialization and deserialization without fundamentally avoiding memory overflow
  3. If neither pure memory level is available, the MEMORYANDDISKSER or MEMORYAND_DISK policy can be used
  4. DISKONLY (data is read and written entirely based on disk files) and persistence policy with suffix 2 (2) are not recommended. The persistence policy with suffix 2 creates a copy of data and saves the data to other nodes. The network overhead caused by data replication and copy transfer reduces performance, but has the advantage of high availability.

Idea 2: Optimize common Spark parameters

Several common optimization parameters are summarized here:

Idea 3: The main cause of the Slow Spark task is data skew

What is data skew?

During the Spark job running, an important performance drain is the Shuffle process. In a Shuffle process, you need to pull a key from multiple nodes in the cluster to the same node to perform aggregation or join operations, such as reduceByKey and Join, which trigger the Shuffle operation. During Shuffle, a large number of DISK file READ/write I/O operations and data network transfer operations may occur. In addition, if the distribution of keys in data is extremely uneven (common in actual service scenarios, for example, agents or crawlers have more behaviors than common users), As a result, some tasks take a long time to execute (you can view the execution duration and processing data size of each Task on the previous Web UI), or the Task may fail due to memory overflow due to the large amount of data processed by a Task.

Basic solution of data skew

What about data skew? Most problems can be solved by following these steps:

  1. Based on the Web UI introduced above, you can check the abnormal execution of tasks, see the execution time and the amount of data processed, and then determine the Stage where the Task is located. According to the Stage division principle and the number of code lines specified on the Web UI, you can locate the code where the data skew occurs.
  2. After locating the code segment that causes data skew, perform detailed analysis based on different operators, such as Shuffle caused by groupBy or Join. You can view the key distribution of the RDD, count the number of occurrences of each key, and perform analysis in descending order.
  3. If a few keys are found to cause skew and the impact on services is not significant, you can directly filter out the few keys. For example, when calculating the click behavior of users, you can find that the clicks of users with user_name= “0” are hundreds of thousands of times more than those of ordinary users. We can directly filter the few keys that cause data skewness. This method is simple to implement and can fundamentally solve data skewness. However, there are few applicable scenarios and it may have business risks.
  4. To improve the parallelism of the Shuffle operation (spark. SQL. Shuffle. Partitions), increase the Shuffle read the number of the task, can make originally assigned to a task assigned to multiple task, multiple key This allows each Task to process less data than before. However, this approach generally solves most of the problems, but it cannot completely eliminate the data skew problem. For example, if the data is extremely unbalanced, such as 100,000 pieces of data per key, then no matter how many tasks you increase, The key, which corresponds to 100,000 data, must still be assigned to a Task.

Avoid using Shuffle operators

Avoid using Shuffle operators. For example, use Map Join instead of Reduce Join (common join). Use Broadcast variables and Map operators to implement join operations, thus avoiding Shuffle operations completely. Completely avoid the occurrence and appearance of data skew. You can create a Broadcast variable to Broadcast the data in a small RDD to each Executor, and perform a Map operation on the other RDD to compare each data in the RDD by key. However, this method is applicable only when a large table joins a small table and the size of the small table is several hundred meters or one or two GIGABytes.

Data skew optimization – Key hash design reaggregation

Key hash design reaggregation. The data skew problem caused by Spark’s Shuffle operation is similar to the HBase hotspot problem to some extent. Therefore, the HBase RowKey hash design principle can be applied to the data skew scenario caused by Shuffle operation of aggregation. So how do we do that? The key is hashed first, usually using random numbers or based on the specific content of the key. The purpose is to hash a large key into k small keys. Therefore, the data that must be pulled to one Task for Shuffle calculation can be pulled to k different tasks for calculation, alleviating the data skew caused by excessive data processing by a single Task to some extent. Then, the hash is removed from the locally aggregated key and the final aggregation result is obtained. This key hash design idea is widely used in data skew caused by the Shuffle operator of the Join class. Similarly, data skew caused by keyBey on Flink causes Flink tasks to hang frequently.


Nine,

What does this article mainly share:

  1. This section describes the historical background and mission of Spark
  2. Basic Spark concepts and Web UI
  3. Spark Performance Optimization

Looking forward to bringing you:

  1. Learn more about Spark
  2. Use Spark to solve actual work problems
  3. Strengthen awareness and ability of performance optimization