1, the preface

We already know that SparkContext is prepared by sending a RegisterApplication to the Master to register the Spark application

Let’s move on to the Cluster submission process

  1. After the task is submitted, the Master (resource manager) finds a Worker (node) to start the Driver process, which registers the application with the Master
  2. The Master enables the corresponding Worker according to the resource configuration of the Submit script
  3. Start all executors on the Worker and the Executor reverts to the Driver for registration
  4. The Driver starts executing the main function, and then when executing the Action operator, it divides stages. Each stage generates a taskSet, and then distributes tasks to each Executor

Now that the source code for Step 1 is all traced, let’s move on to the Master assignment in Step 2

2. Master resource allocation 01

We know from resource layer source code analysis that the Master receives the message, will be through the receiver() method to match the processing logic, we go straight to the topic, Master. Receive () method, looking for the RegisterApplication message processing logic

Let’s briefly explain the code logic

if (state == RecoveryState.STANDBY) {
  // The current Master is standby and will not be processed
  // ignore, don't send response
} else {
  // Encapsulate app and driver information into ApplicationInfo()
  val app = createApplication(description, driver)
  // Add the encapsulated app information to the waiting collection maintained by the Master
  registerApplication(app)
  persistenceEngine.addApplication(app)
  // Send a RegisteredApplication message to the Driver
  driver.send(RegisteredApplication(app.id, self))
  // Allocate resources to waiting programs
  schedule()
}
Copy the code

The schedule() code has been explained in the previous article, Spark source code 03-Submit submission process and Driver start process

private def schedule() :Unit = {

    // Find the Worker who is still alive
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0

    /* * 1
    // The Driver in the wait set is iterated
    for (driver <- waitingDrivers.toList) {
      var launched = false
      var numWorkersVisited = 0
      // find the right Worker
      while(numWorkersVisited < numWorkersAlive && ! launched) {val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          1.3. Start the Driver
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    /* * worker-related scheduling */
    startExecutorsOnWorkers()
  }
Copy the code

Since we’ve already looked at Driver scheduling, let’s go straight to the startExecutorsOnWorkers() method. You can see that this method is relatively simple because the comments clearly explain what the code does

Let’s continue with the logic of the code

private def startExecutorsOnWorkers() :Unit = {

  // Walk through the app to be allocated recorded in Master
  for (app <- waitingApps) {
    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)// The number of cores to allocate

    if (app.coresLeft >= coresPerExecutor) {
      // Filter to unavailable workers
      // The number of remaining cores and memory meet the requirements, and the number of remaining cores is sorted in reverse order
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor)
        .sortBy(_.coresFree).reverse
      // Verify the number of cores that need to be allocated
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
	  // Notify worker to allocate resources to executor
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
      }
    }
  }
}
Copy the code

From the above code logic we need to focus on to a method scheduleExecutorsOnWorkers () is used to app allocate resources calculation, we’ll look at below scheduleExecutorsOnWorkers () method, because the method through the long, We’ll ignore the internally defined canLaunchExecutor() method for now

We know get from the screenshot, in scheduleExecutorsOnWorkers () method defines the following properties

val coresPerExecutor = app.desc.coresPerExecutor // The number of cores an Executor needs to allocate, as specified by the user
val minCoresPerExecutor = coresPerExecutor.getOrElse(1) // The default is 1
val oneExecutorPerWorker = coresPerExecutor.isEmpty  // indicates whether an Executor consumes all cores
val memoryPerExecutor = app.desc.memoryPerExecutorMB // The size of memory an Executor needs to allocate
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) 
val assignedExecutors = new Array[Int](numUsable) 
If the Worker has enough cores, take the [total cores] we need; if not, take the remaining cores
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) 
Copy the code

There is also a three-layer loop nested code

// Filter available workers through the canLaunchExecutor() method
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) { / / the first layer
  freeWorkers.foreach { pos =>  / / the second floor
    var keepScheduling = true
    // Assign for each Worker
    while (keepScheduling && canLaunchExecutor(pos)) { / / the third floor
      coresToAssign -= minCoresPerExecutor
      assignedCores(pos) += minCoresPerExecutor

      // An Executor consumes all Worker cores
      if (oneExecutorPerWorker) {
        assignedExecutors(pos) = 1
      } else {
        // An Executor consumes part of the Worker's core
        assignedExecutors(pos) += 1
      }
	  
      // Horizontal or vertical allocation
      if (spreadOutApps) {
        keepScheduling = false
      }
    }
  }
  freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
Copy the code

Let’s take a look at the internally defined canLaunchExecutor() method. We can also see from the comments what this method does: “Returns whether the specified Worker started Executor for the program”, and we also know that this method is analyzed for a Worker

As can be seen from canLaunchExecutor(), it determines whether the Worker’s memory and cores meet the requirements. Let’s explain the code logic below

def canLaunchExecutor(pos: Int) :Boolean = {
  // Whether the number of cores to be allocated is greater than the minimum number of cores in the configuration
  val keepScheduling = coresToAssign >= minCoresPerExecutor
  // Whether the Worker minus allocated cores is greater than the minimum number of cores in the configuration
  val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

  // Whether a Worker starts only one Executor or the first assignment
  vallaunchingNewExecutor = ! oneExecutorPerWorker || assignedExecutors(pos) ==0
  if (launchingNewExecutor) {
    // Allocated memory
    val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
    // Is there enough memory to allocate
    val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
    // Whether the quota has been exceeded
    val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
    
    keepScheduling && enoughCores && enoughMemory && underLimit
  } else {
    // We're adding cores to an existing executor, so no need
    // to check memory and executor limits
    keepScheduling && enoughCores
  }
}
Copy the code

3, Master resource allocation 02

After reading the above code explanation, I believe many readers will be like the author in a very confused state. How does the Master partition resources, let’s use a practical example to explain the Master resource partition

Spark-submit --master spark://127.0.0.1:7077 \ --deploy-mode cluster \ --driver-memory 1g \ --class org.apache.spark.examples.SparkPi \${SPARK_HOME}/examples/jars/spark-examples.jar \
# This article focuses on the following three parameters--executor cores 4 \ # Totle-executor cores 12 \ # Totle-executor cores 4 \ # Totle-executor cores 12 \ Indirectly specifies that there are 12/4 = 3 executors --executor-memory 4g \ # Specifies that each executor occupies 4g memoryCopy the code

In the shell commit script above, we focus on the three parameters associated with Executor. Let’s map the property values to the source code above

val coresPerExecutor = 4 // The number of cores an Executor needs to allocate, as specified by the user
val minCoresPerExecutor = 4 // The default is 1
val oneExecutorPerWorker = false  // indicates whether an Executor consumes all cores
val memoryPerExecutor = 4g // The size of memory an Executor needs to allocate
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) 
val assignedExecutors = new Array[Int](numUsable) 
If the Worker has enough cores, take the [total cores] we need; if not, take the remaining cores
var coresToAssign = 12 
Copy the code

Below, the default number of available workers is 3, and the number of cores and memory of the three workers is different

val numUsable = 3
val assignedCores = new Array[Int](numUsable) 
val assignedExecutors = new Array[Int](numUsable) 
Copy the code

Now let’s get the property diagram ready and use the graph to walk the Master through the process of allocating resources

We can see from the resources of the three workers that they can definitely be filtered by the canLaunchExecutor() method

Take out the first Worker(8C 12G) with subscript 0, then we return to the code nested in the three-layer loop above

// Filter available workers through the canLaunchExecutor() method
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) { / / the first layer
  freeWorkers.foreach { pos =>  / / the second floor
    var keepScheduling = true
    // Assign for each Worker
    while (keepScheduling && canLaunchExecutor(pos)) { / / the third floor
      / / step one
      coresToAssign -= minCoresPerExecutor
      assignedCores(pos) += minCoresPerExecutor

      // Step 2: An Executor consumes all Worker cores
      if (oneExecutorPerWorker) {
        assignedExecutors(pos) = 1
      } else {
        // An Executor consumes part of the Worker's core
        assignedExecutors(pos) += 1
      }
      // Step 3: Horizontal allocation or vertical allocation
      if (spreadOutApps) {
        keepScheduling = false
      }
    }
  }
  freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
Copy the code

After the Worker with subscript 0 has gone through the first and second steps, coresToAssign = 8, and the distribution is shown in the following figure

It can be seen from the above figure that the first round of allocation has been completed after the first and second steps, and there is a third step. Let’s explain it carefully. It can be seen that there are two judgment conditions in the third layer cycle, among which the keepScheduling can be modified in step 3

  • KeepScheduling is false, the third layer cycle ends, and Worker only allocates once
  • KeepScheduling is true (default), the third layer cycle continues, and the Worker keeps allocating resources until there are no more resources to allocate

Based on the above analysis, we can know whether keepScheduling controls the one-time allocation of workers in resource allocation or horizontal allocation among workers. However, it is another parameter spreadOutApps that affects the assignment of keepScheduling. This parameter can be seen in Master. The default value is true, i.e., keepScheduling is assigned false by default and workers are assigned horizontally

while (keepScheduling && canLaunchExecutor(pos)) { / / the third floor

  // Step 3: Horizontal allocation or vertical allocation
  if (spreadOutApps) {
    keepScheduling = false}}Copy the code

Then the Worker with subscript 1 (12C 32G) is assigned coresToAssign = 4, and the result is as follows

Still true due to the judgment in canLaunchExecutor()

coresToAssign >= minCoresPerExecutor
Copy the code

The Worker subscript 2 (12C 20G) is continued to be allocated, and the result is as follows

CoresToAssign = 0 does not satisfy canLaunchExecutor(), which is the end of the Master resource allocation

4. Master resource allocation 03

As we know from Spark’s submission process, the Master will tell the Worker to start the corresponding Executor after partitioning the resource. Let’s continue tracing the source code to see if this is the case. Continue with the startExecutorsOnWorkers() method and look at the final for loop. With our good assignedCores front distribution, respectively traversal and call allocateWorkerResourceToExecutors () method

private def startExecutorsOnWorkers() :Unit = {

  // Walk through the app to be allocated recorded in Master
  for (app <- waitingApps) {
    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)// The number of cores to allocate

    if (app.coresLeft >= coresPerExecutor) {
      // Filter to unavailable workers
      // The number of remaining cores and memory meet the requirements, and the number of remaining cores is sorted in reverse order
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor)
        .sortBy(_.coresFree).reverse
      // Verify the number of cores that need to be allocated
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
        
	  // Notify worker to allocate resources to executor
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
      }
    }
  }
}
Copy the code

Here we’ll look at allocateWorkerResourceToExecutors (), from the method name we can know the Worker in the allocation of resources to the Executor

The code logic of this method is simple, so we won’t explain it too much

private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    assignedCores: Int,
    coresPerExecutor: Option[Int],
    worker: WorkerInfo) :Unit = {
  
  // How many executors does the Worker allocate? In the example above, (allocated cores)4 / (minimum cores in the configuration file)4 = 1
  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i <- 1 to numExecutors) {
    val exec = app.addExecutor(worker, coresToAssign)
    // Send a message to the Worker
    launchExecutor(worker, exec)
    app.state = ApplicationState.RUNNING}}Copy the code

Next we look at launchExecutor(). At line 755 we can see that the Master sends the launchExecutor message to the Worker to tell the Worker to start the Executor

worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
Copy the code

Now that the Master allocation is complete, let’s summarize the details of the Master allocation

Summary:

  • The primary method for partitioning Master resources is startExecutorsOnWorkers()
  • By default, the Master divides resources according to Worker level

Executor starts the process

LaunchExecutor () : LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor Since we are in A Windows environment, the Implementation of the Java startup process is not the same as the implementation of the Windows and Linux platforms, so we do not need to focus on the details of Executor startup. We are concerned with the operation of Executor startup

About starting Executor, We need to recall an article on the Spark source code parsing 04 – Submit the submission process and SparkContext preparation process of a property org. Apache. Spark. Executor. CoarseGrainedExecutorBackend, This is the full class name of the Executor we actually launched

Here we’ll look at CoarseGrainedExecutorBackend class, as a result of this class has associated objects, we directly from the main () method to obtain, can see the main () method is used to encapsulate the parameters, and then call the run () method

Now let’s look at the run() method

Since the code length we ignored part of the code, directly to 231 lines of code, familiar rpcEnv. SetupEndpoint () method, the endpoint CoarseGrainedExecutorBackend registered to rpcEnv environment, and a background thread invokes the onStart () method

env.rpcEnv.setupEndpoint("Executor".new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
Copy the code

If we look at the onStart() method, we can see that line 63 sends a RegisterExecutor message to the driver. This does not correspond to the submission process: The Executor is successfully started and the driver is registered in reverse

ref.ask[Boolean] (RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
Copy the code

Below we need to go back to CoarseGrainedSchedulerBackend class, view DriverEndpoint receiveAndReply () method, find RegisterExecutor message processing logic can see there are three branches, Because of the length of the code, we will briefly introduce it

if (executorDataMap.contains(executorId)) {
  // Branch one: Executor is registered
  executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
  context.reply(true)}else if(scheduler.nodeBlacklist ! =null &&
  scheduler.nodeBlacklist.contains(hostname)) {
  // Branch 2: Executor has a blacklist
  executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
  context.reply(true)}else {
  // Branch 3: Executor registration
  // Driver records Executor information. executorRef.send(RegisteredExecutor)
  context.reply(true)}Copy the code

Source from above we know that the Driver in the Executor, after registering successfully sent to the Executor RegisteredExecutor registration message, here we go back to CoarseGrainedExecutorBackend. The receive () method, Looking at the processing logic for the RegisteredExecutor message, you can see that there is only one simple line of code

executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
Copy the code

The executor is CoarseGrainedExecutorBackend an attribute, it is an important attribute

Let’s look at Executor, focusing on the first comment line: “Spark executor, backed by a threadpool to run tasks”, run tasks by a threadpool, this is not important hint, executor has one of the most important properties is threadpool

In the Executor source 89 lines of code we found the threadPool properties, both really execute task is run in the Executor thread pool threadPool, Executor and CoarseGrainedExecutorBackend hold properties

Since there is a threadPool, there must be a method of execution. If we look at Executor line 174, launchTask(), we can see that executing a task is wrapping it up and putting it into a threadPool

Let’s graph what we know about the Master resource allocation and Executor startup process

Conclusion:

  • CoarseGrainedExecutorBackend is what we call Executor role, and in its start back to Driver to register
  • A executor in the CoarseGrainedExecutorBackend properties, executor of threads in thread pool is the real work

At this point we have tracked the entire process of the SparkSubmit submission