Spark kernel refers to the core operation mechanism of Spark, including the operation mechanism of Spark core components, Spark task scheduling mechanism, Spark memory management mechanism, and the operation principle of Spark core functions. A good command of Spark kernel principle can help us complete Spark code design. And can help us accurately lock the crux of the problem in the process of project operation.

Spark Kernel Overview

Review of Spark core components

Driver

The Spark driver node is used to execute the main method in the Spark task and execute the actual code. The Driver is responsible for the following tasks during Spark job execution:

  1. Convert user programs into jobs;
  2. Scheduling tasks between executors;
  3. Track Executor execution;
  4. Display query running status through UI;

Executor

The Spark Executor node is a JVM process that runs specific tasks independently of each other in a Spark job. When the Spark application is started, Executor nodes are started at the same time and exist throughout the Spark application life cycle. If an Executor node fails or crashes, The Spark application can continue to run. The tasks on the faulty Executor node are scheduled to run on other Executor nodes.

Executor has two core functions:

  1. Run the tasks that comprise the Spark application and return the results to the driver process.
  2. They provide in-memory storage through their Block Managers for RDD’s that require caching in user programs. RDD is cached directly within the Executor process, so tasks can take advantage of cached data to speed up operations at run time.

Spark general running process Overview

After submitting a task, the Driver process is started. Then the Driver process registers applications with the cluster manager. The cluster manager assigns executors based on the task configuration file and starts them. When all the resources required by the Driver are met, the Driver starts to execute the main function and Spark query is lazy. When the action operator is executed, the Driver starts to calculate backwards. Stages are divided according to the wide dependency, and then each stage corresponds to a taskset. A taskset contains multiple tasks. According to the localization principle, the tasks are assigned to a specific Executor for execution. During the task execution, the Executor communicates with the Driver to report the task status.

Spark deployment mode

Spark supports three Cluster managers:

  1. Standalone: In Standalone mode, Spark’s native simple cluster manager comes with a complete set of services that can be deployed independently to a cluster without relying on any other resource management system.
  2. Apache Mesos: a powerful distributed resource management framework that allows many different frameworks to be deployed on it, including YARN;
  3. Hadoop YARN: a unified resource management mechanism on which multiple computing frameworks, such as Map Reduce and Storm, can be run. Drivers are classified into YARN client and YARN Cluster based on their location in a cluster. In addition to these generic cluster managers, Spark also provides some simple cluster deployment patterns that users can easily test and learn from. Since most cluster managers used in real factory environments are Hadoop YARN, we focused on Spark cluster deployment in Hadoop YARN mode. The operating mode of Spark depends on the value of the MASTER environment variable passed to SparkContext. Some modes also require auxiliary programming interfaces. Currently, the supported MASTER strings and urls include:
Master URL

Meaning

local

It runs locally with only one worker process and no parallel computing capability.

local[K]

Running locally with K worker processes, K is usually set to the number of CPU cores on the machine.

local[*]

Running locally, the number of worker processes is equal to the number of CPU cores on the machine.

spark://HOST:PORT Run in Standalone mode, which is the cluster mode provided by Spark itself. The default port number is 7077. For details, see Spark Standalone Cluster.
mesos://HOST:PORT The Driver and Worker processes run on the Mesos cluster. The deployment mode must be fixed :–deploy-mode cluster. See MesosClusterDispatcher for more documentation.
yarn-client

The Driver process is local, and the Executor process is on the Yarn cluster. The deployment mode must be fixed :–deploy-mode client. The Yarn cluster address must be in HADOOPCONFDIR or YARNCONFDIR variable.
yarn-cluster

The Driver process is running on the Yarn cluster, and the Work process is running on the Yarn cluster. The deployment mode must be fixed :–deploy-mode cluster. The Yarn cluster address must be in HADOOPCONFDIR or YARNCONFDIR variable.

When a user submits a task to Spark, the following two parameters jointly determine the Spark operation mode. · – Master MASTER_URL: Determines the cluster to which the Spark task is submitted for processing. · – deploy-mode DEPLOY_MODE: determines the operating mode of the Driver. The value can be Client or Cluster.

Standalone mode running mechanism

The Standalone cluster has four important components, which are:

1) Driver: it is a process on which the Spark application program is executed by the Driver process. 2) Master(RM) : a process that schedules and allocates resources and monitors clusters. 3) Worker(NM) : it is a process. A Worker runs on a server in a cluster and is mainly responsible for two responsibilities. One is to use its own memory to store some or some partitions of the RDD. The other is to start other processes and threads (executors) to process and evaluate partitions on the RDD in parallel. 4) Executor: It is a process that can run multiple executors on a Worker. Executors start multiple threads (tasks) to perform parallel computation on partitions of RDD, that is, to perform operators such as MAP, flatMap and Reduce defined on RDD.

Standalone Client mode

In Standalone Client mode, the Driver runs on the local machine where the task is submitted. After the Driver is started, it registers the application with the Master, who finds internal resources to start at least one Executor Worker based on the resource requirements of the Submit script. The executors on the Worker will register with the Driver in reverse. After all executors are registered, the Driver starts to execute the main function. After executing the Action operator, the Driver starts to divide the stages. A taskSet is generated for each stage, and the task is then distributed to each Executor for execution.

Standalone Cluster mode

In Standalone Cluster mode, after the task is submitted, the Master finds a Worker to start the Driver process, which registers the application with the Master. According to the resource requirements of the Submit script, the Master finds internal resources that can start at least all the workers of an Executor, and then allocates executors among these workers. After starting, the executors on the Worker will reverse register with the Driver. After all executors are registered, the Driver starts to execute the main function. After executing the Action operator, the Driver starts to divide stages. Each stage generates a taskSet and distributes tasks to each Executor for execution.Pay attention toIn Standalone mode (client/Cluster), the Master, upon receiving a request from the Driver to register the Spark application, obtains the remaining resources it manages to start all the workers of an Executor. Executors are then distributed between these workers, and distribution only takes into account whether the resources on the Worker are sufficient until all executors required by the current application are allocated and the Executor is reverse-registered, and the Driver starts executing main.

Operation mechanism of Yarn mode

Yarn Client mode

In YARN Client mode, the Driver runs on the local machine where tasks are submitted. After the Driver starts, it communicates with ResourceManager to apply for starting ApplicationMaster, and ResourceManager allocates containers. Start ApplicationMaster on the appropriate NodeManager, which functions as an ExecutorLaucher and only applies for Executor memory from ResourceManager.

ResourceManager allocates containers to ApplicationMaster after receiving ApplicationMaster’s resource request. ApplicationMaster starts the Executor process on the Specified NodeManager. After the Executor process is started, it registers with the Driver in reverse. After all executors are registered, the Driver starts to execute the main function. When the Action operator is executed, a job is triggered and stages are divided based on the wide dependencies. The task is then distributed to each Executor for execution.

Yarn Cluster mode

In YARN Cluster mode, after a task is submitted, ResourceManager communicates with ResourceManager to start ApplicationMaster, and ResourceManager allocates a Container. Start ApplicationMaster on the appropriate NodeManager, in which case the ApplicationMaster is the Driver.

After the Driver starts, it applies for Executor memory from ResourceManager. ResourceManager allocates Containers after receiving ApplicationMaster’s request. The Executor process is reversely registered with the Driver. After all executors are registered, the Driver executes the main function. When the Action operator is executed, a job is triggered. Each stage generates a taskSet, and then distributes tasks to each Executor for execution.

Spark Communication Architecture

Spark Communication architecture Overview

Spark2.x uses the Netty communication framework as an internal communication component. Spark’s new NETty-based RPC framework borrows from Akka’s design and is based on the Actor model, as shown below:Each component (Client, Master, and Worker) in the Spark communication framework can be regarded as an independent entity. The entities communicate with each other through messages. The relationship between each component is shown as follows:The Endpoint (Client/Master/Worker) has 1 InBox and N outboxes (N>=1, N depends on how many other endpoints the current Endpoint communicates with, and each Endpoint that communicates with it corresponds to an OutBox). Messages received by the Endpoint are written to the InBox, and outgoing messages are written to the OutBox and sent to the InBox of other endpoints.

Spark communication architecture analysis

The Spark communication architecture is shown as follows:

1) RpcEndpoint: Each node (Client, Master, and Worker) is called an RPC endpoint. Each node implements the RpcEndpoint interface and internally designs different messages and service processing based on the requirements of different endpoints. Call Dispatcher if you need to send (ask); 2) RpcEnv: RPC context, the context on which each RPC endpoint runs is called RpcEnv; 3) Dispatcher: message distributor, for RPC endpoint needs to send messages or messages received from remote RPC, to the corresponding instruction inbox/outbox. If the instruction receiver is himself, it is put into the inbox, if the instruction receiver is not himself, it is put into the outbox; 4) the Inbox: Instruction message Inbox, a local RpcEndpoint corresponds to an Inbox, and the Dispatcher adds the corresponding EndpointData to the internal ReceiverQueue every time it stores a message to the Inbox. In addition, when Dispatcher is created, a separate thread will be started to poll ReceiverQueue for inbox message consumption. 5) RpcEndpointRef: RpcEndpointRef is a reference to remote RpcEndpoint. When we need to send a message to a specific RpcEndpoint, we usually need to get a reference to the RpcEndpoint and send the message through the application. 6) OutBox: instruction message OutBox. For the current RpcEndpoint, one target RpcEndpoint corresponds to one OutBox. If information is sent to multiple target RPcendpoints, there are multiple Outboxes. Once the message is put into the Outbox, it is then sent out through the TransportClient. Messages are put into outboxes and sent in the same thread; 7) RpcAddress: indicates the address of remote RpcEndpointRef, Host + Port. 8) TransportClient: Netty communication clients. One OutBox corresponds to one TransportClient. The TransportClient constantly polls the OutBox and requests the remote TransportServer based on the receiver information of OutBox messages. 9) TransportServer: Netty communication server. One RpcEndpoint corresponds to one TransportServer. After receiving remote messages, Dispatcher is called to distribute the messages to the corresponding sending and receiving boxes. Based on the above analysis, a high-level view of the Spark communication architecture is shown below:

Spark task scheduling mechanism

In a factory environment, the Spark Cluster is usually deployed in yarn-cluster mode. In subsequent kernel analysis, the default Cluster deployment mode is yarn-cluster mode.

Spark task submission process

The sequence diagram below clearly illustrates the complete flow of a Spark application from commit to run:To submit a Spark Application, start an Application to ResourceManager through the Client and check whether sufficient resources meet the Application requirements. If the resources meet the requirements, Prepare the startup context of ApplicationMaster, hand it to ResourceManager, and monitor the Application status periodically.

When resources exist in the submitted resource queue, ResourceManager starts ApplicationMaster on a NodeManager. ApplicationMaster starts the Driver background thread separately. After the Driver starts, ApplicationMaster connects to the Driver through the local RPC, applies for Container resources from ResourceManager, and runs the Executor process (one Executor corresponds to one Container). When ResourceManager returns a Container resource, ApplicationMaster starts Executor on the corresponding Container.

The Driver thread mainly initializes the SparkContext object, prepares the context needed to run, and then on the one hand maintains the RPC connection with ApplicationMaster, applies for resources through ApplicationMaster, and on the other hand starts scheduling tasks according to the user business logic. Deliver the task to an existing free Executor.

When ResourceManager returns the Container resource to ApplicationMaster, ApplicationMaster tries to start the Executor process on the corresponding Container. After the Executor process starts, Reverse registration is performed for the Driver. After the registration is successful, the heartbeat communication between the Driver and the Driver is maintained and the Driver waits for the task to be distributed. After the distributed task is complete, the task status is reported to the Driver.

As you can see from the sequence diagram above, the Client is only responsible for submitting the Application and monitoring its status. Task scheduling for Spark focuses on two aspects: resource application and task distribution. These tasks are implemented between ApplicationMaster, Driver, and Executor.

Overview of Spark task scheduling

When the Driver is up, it prepares tasks based on the user program logic and distributes them gradually based on Executor resources. Before explaining task scheduling in detail, I first explain some concepts in Spark. A Spark application includes Job, Stage, and Task concepts: a Job is bounded by an Action method, which triggers a Job when an Action method is encountered. Queue Stage is a subset of Job, taking the RDD wide dependency (Shuffle) as the boundary, dividing the queue once when Shuffle is encountered. A queue Task is a subset of Stages, where there are several tasks, measured by parallelism (the number of partitions). Spark Task scheduling is performed in two modes: Stage level scheduling and Task level scheduling. The overall scheduling process is as follows:

The Spark RDD uses its Transactions operation to form the RDD blood diagram, that is, DAG. Finally, the Spark RDD invokes actions to trigger jobs and schedule execution. DAGScheduler is responsible for stage-level scheduling. It mainly divides jobs into several Stages and packages each Stage into tasksets for the TaskScheduler to schedule. TaskScheduler is responsible for task-level scheduling. The TaskSet sent by DAGScheduler is sent to Executor based on the specified scheduling policy. During the scheduling process, SchedulerBackend provides available resources. SchedulerBackend has multiple implementations that connect to different resource management systems. With this in mind, the following figure shows the interaction between ApplicationMaster, Driver, and Executor internal modules during task scheduling in Spark-on-YARN mode:

When the Driver initializes SparkContext, it initializes DAGScheduler, TaskScheduler, SchedulerBackend, and HeartbeatReceiver. SchedulerBackend and HeartbeatReceiver are started. SchedulerBackend applies for resources through ApplicationMaster and continually gets appropriate tasks from TaskScheduler to Executor for execution. The HeartbeatReceiver receives the Executor’s heartbeat, monitors the Executor’s health, and notifies the TaskScheduler.

Spark stage-level scheduling

Spark’s task scheduling starts with DAG cutting and is mainly done by DAGScheduler. When an Action is encountered, it will trigger the calculation of a Job and hand it to DAGScheduler for submission. The following is the flow chart of method invocation related to Job submission.

The Job is packaged by the final RDD and Action methods. SparkContext submits the Job to DAGScheduler, which splits a Job into several Stages based on the DAG formed by the blood relationship of RDD. The specific division strategy is: The final RDD continuously determines whether the parent dependency is a wide dependency through dependency backtracking, that is, stages are divided with Shuffle as the boundary. RDD with narrow dependencies are divided into the same Stage, and pipelin-type calculation can be performed, as shown in the purple process in the figure above. There are two types of Stages. One is called a ResultStage, which is the most downstream Stage of a DAG and is determined by the Action method. The other is called a ShuffleMapStage, which prepares data for the downstream Stages.A Job is triggered by saveAsTextFile, which consists of rdD-3 and saveAsTextFile methods. According to the dependency relationship between RDD, the Job is searched backtracking from RDD-3 to RDD-0. During the backtracking, RDD-3 depends on RDD-2 and has a wide dependency. Therefore, stages are divided between RDD-2 and RDD-3. Rdd-3 is divided into the last Stage, that is, in the ResultStage, RDD-2 depends on RDD-1 and RDD-1 depends on RDD-0. These dependencies are narrow dependencies. Therefore, RDD-0, RDD-1 and RDD-2 are divided into the same Stage, namely ShuffleMapStage. During the actual execution, data records will perform the transformation from RDD-0 to RDD-2 in one go. It is not difficult to see that its essence is a depth-first search algorithm.

If a Stage is submitted, we need to determine whether its parent Stage is executed. The current Stage can be submitted only after the parent Stage is executed. If a Stage does not have a parent Stage, the submission starts from this Stage. When Stage is submitted, Task information (Partition information and methods, etc.) is serialized and packaged into a TaskSet and sent to the TaskScheduler. Each Partition corresponds to a Task. On the other hand, the TaskScheduler monitors the running status of the Stage. Only when the Executor is lost or the Task fails due to Fetch, it needs to resubmit the failed Stage to schedule the failed Task. Other types of Task failures are retried during the scheduling process of the TaskScheduler.

DAGScheduler does relatively simple things, just partitioning daGs at the Stage level, committing stages and monitoring related state information. TaskScheduler is more complex, which is explained in detail below.

Spark Task-level scheduling

The Spark Task is scheduled by the TaskScheduler. DAGScheduler packages the Stage into the TaskSet and gives it to the TaskScheduler. The TaskScheduler encapsulates the TaskSet as TaskSetManager and adds it to the scheduling queue. The TaskSetManager structure is shown below.

The TaskSetManager is responsible for monitoring and managing Tasks in the same Stage. The TaskScheduler uses the TaskSetManager as a unit to schedule Tasks.

As mentioned earlier, After TaskScheduler is initialized, SchedulerBackend is used to interact with the outside world, receive Executor registration information, and maintain Executor status. Therefore, SchedulerBackend is responsible for “food”. It will also periodically “ask” TaskScheduler if it has any tasks to run after it starts. That is, it will periodically “ask” TaskScheduler “I have this amount of spare time, do you want it?” When TaskScheduler “asks” it in Backend. TaskSetManager will be selected from the scheduling queue according to the specified scheduling policy to schedule and run. The general method call flow is as follows:

After adding TaskSetManager to the rootPool scheduling pool, riviveOffers method of SchedulerBackend is called to send a ReviveOffer message to driverEndpoint. When driverEndpoint receives the ReviveOffer message, it calls the makeOffers method to filter out the active executors that were reverse-registered with the Driver at task startup. Then encapsulate the Executor as a WorkerOffer object; Once the workerOffers are ready, the taskScheduler calls the resourceOffer to assign tasks to executors based on those resources.

Scheduling policy

TaskScheduler wraps a TaskSet from DAGScheduler into a TaskSetManager and throws it into a task queue. They are then fetched from the task queue according to certain rules and run on executors given by SchedulerBackend. The scheduling process is actually coarse-grained for TaskSetManager. The TaskScheduler manages the task queue in a tree. The node type in the tree is Schdulable, the leaf node is TaskSetManager, and the non-leaf node is Pool. The following figure shows the inheritance relationship between them.

TaskScheduler supports two scheduling strategies, FIFO, which is the default, and FAIR. During TaskScheduler initialization, the TaskScheduler instantiates rootPool, which represents the root node of the tree and is of Pool type.

FIFO scheduling policy

If the FIFO scheduling strategy is used, the TaskSetManager is simply added to the queue on a first-come-first-served basis, and the TaskSetManager of the most advanced queue is pulled out of the queue. The TaskSetManager is stored in a FIFO queue as shown in the following figure.

FAIR Scheduling Policy

The tree structure of FAIR scheduling policy is as follows:

The FAIR mode has one rootPool and multiple sub-pools, each of which stores all taskSetMagagers to be allocated. In FAIR mode, you need to sort the subpools first, and then the TaskSetMagager in the subpools, because both pools and TaskSetMagager inherit Schedulable properties and use the same sort algorithm. The comparison of sorting procedures is based on fair-share, and each object to be sorted contains three attributes: RunningTasks value (number of runningTasks), minShare value, and weight value. RunningTasks value, minShare value, and weight value are taken into account when comparing tasks. Note that the values of minShare and weight are specified in the fairScheduler.xml file, which is read by the scheduling pool during construction.

1) If A’s runningTasks are larger than its minShare and B’s runningTasks are smaller than its minShare, then B is ahead of A. 2) If the runningTasks of objects A and B are both smaller than their minShare, then compare the ratio of runningTasks to minShare. 3) If the runningTasks of objects A and B are both greater than their minShare, then compare the ratio of runningTasks to weight. 4) If all the above comparisons are equal, then compare the names. On the whole, the comparison process can be controlled by the two parameters minShare and weight, so that the minShare usage and weight usage (fewer tasks actually run) can be run first. After sorting in FAIR mode, all tasksetManagers are put into an ArrayBuffer, which is then retrieved and sent to Executor for execution. Since the TaskSetManager encapsulates all tasks of a Stage and is responsible for managing and scheduling these tasks, The next Task is for the TaskSetManager to fetch tasks one by one according to certain rules to the TaskScheduler, which then sends tasks to SchedulerBackend for execution by the Executor.

Local scheduling

DAGScheduler cuts jobs, divides stages, and submits tasks corresponding to a Stage by calling submitStage, which calls submitMissingTasks. SubmitMissingTasks determine the preferredLocations of each task to be evaluated, and call getPreferrdeLocations() to get the partition’s priority. Since a partition corresponds to a task, the priority of the partition is the priority of the task. For each task in the TaskSet submitted to the TaskScheduler, the priority of the task is the same as that of the corresponding partition.

Once the TaskSetManager is retrieved from the scheduler queue, the next task is for the TaskSetManager to fetch tasks one by one to the TaskScheduler according to certain rules. The TaskScheduler is sent to SchedulerBackend for execution on Executor. As mentioned earlier, TaskSetManager encapsulates all tasks for a Stage and is responsible for managing and scheduling them.

Based on the priority of each task, determine the Locality level of the task, of which there are five types of Locality in descending order of priority:

The name of the

parsing

PROCESS_LOCAL Process localization, where tasks and data are in the same Executor, performs best.

NODE_LOCAL

Node localization: Tasks and data are in the same node, but tasks and data are not in the same Executor, and data needs to be transferred between processes.
RACK_LOCAL

Rack localization. Tasks and data reside on two nodes in the same rack, and data needs to be transferred between nodes over the network.
NO_PREF

For a Task, it doesn’t matter where you get it, there is no good or bad.

ANY

Tasks and data can be anywhere in the cluster and not in one rack for the worst performance.

When the Spark scheduling task is executed, the Spark scheduling task always starts at the highest level of localeness. If a task is started at the level of localeness X and all nodes corresponding to this level have no free resources, the task fails to start. Instead of immediately lowering the localness level, the task is started again at X localness level within a certain amount of time. If the time limit is exceeded, the task is downgraded to try the next localness level, and so on.

By increasing the maximum latency allowed for each category, the corresponding Executor may have the resources to execute the task during the wait phase, which in turn contributes to performance.

Failure retry and blacklist mechanism

In addition to selecting an appropriate Task to run, it also monitors the execution status of tasks. As mentioned earlier, SchedulerBackend deals with external schedulers. After a Task is submitted to Executor, Executor reports the execution status to SchedulerBackend. SchedulerBackend tells TaskScheduler that the Task scheduler finds the corresponding TaskSetManager and notifies the TaskSetManager. If the number of failed tasks does not exceed the maximum number of retries, the Task is put back into the Task pool. Otherwise, the entire Application fails.When recording the number of Task failures, the system records the Executor Id and Host where the Task failed last time. In this way, the system uses the blacklist mechanism to prevent the Task from being scheduled to the node where the Task failed last time. The blacklist records the Executor Id and Host where the Task failed last time, and the corresponding blocking time. Blocking time indicates that the Task is not scheduled on this node during this period.