This series of articles is based on JerryLead’s SparkInternals. This article is based on the author’s own understanding, annotations, and some source code for learning purposes. After comparison, it is found that the core part has not changed much and is still worth reference

architecture

The first three chapters introduce how programs written by users are decomposed and executed step by step from the perspective of job. This chapter discusses how the master, worker, driver, and Executor work together to execute a job from an architectural perspective.

I really do not want to paste too much code in the document. This chapter is so much, just for the sake of quick positioning when the aspect goes back to debug. If you do not want to see the code, you can directly read the diagram and description.

Deployment diagram

Repaste the deployment diagram shown in Overview:

The diagram is then discussed and refined in stages.

Job submission

The following figure shows how the driver program (presumably running on the Master node) generates jobs and submits them to the worker node for execution.

The logic on the Driver side is expressed in code:

finalRDD.action()
=> sc.runJob()

// generate job, stages and tasks
=> dagScheduler.runJob()
=> dagScheduler.submitJob()
// Place the JobSubmitted task in the event queue. The eventThread background thread will process the submitted task
=>    dagSchedulerEventProcessLoop.post(JobSubmitted)

// Instead of receiving the external message, the thread gets the result from the blocking queue and matches the execution
DAGSchedulerEventProcessLoop.onReceive()
=>  dagScheduler.handleJobSubmitted(jobId, ...)
=>      finalStage = createResultStage(finalRDD...)
            getShuffleDependencies()
                // Use stack to do depth-first traversal
                toVisit.dependencies.foreach {
                  //如果是ShuffleDependency,则为parent
                  case shuffleDep: ShuffleDependency[_, _, _] =>
                    parents += shuffleDep
                    // Join the search queue to find all parent stages
                  case dependency =>
                    waitingForVisit.push(dependency.rdd)
                }
              getOrCreateShuffleMapStage()    
                 createShuffleMapStage()
                    new ShuffleMapStage()
                    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
=>      submitStage(finalStage)
Copy the code
// The parent stage is recursively called. If there is no parent stage, the current stage will be submitted. Stage) // Get the parentStage, if parentStages may already be executed, Val missing = getMissingParentStages(stage) // Stage partition algorithm, based on stack operation, wide dependency create ShuffleMapStage,caseshufDep: ShuffleDependency[_, _, _] => getOrCreateShuffleMapStage(shufDep, Stage.firstjobid) // Add a narrow dependency to the stack, be sure to trace its dependency ShuffleDependency/ NonecasenarrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) submitMissingTasks(stage, Jobid.get)// Submit task submitStage(parent) if there is no parent stage // Recursively submit all parent stages and then submit child stagesCopy the code
dagScheduler.submitMissingTasks(stage: Stage, jobId: Int) taskBinary = sc. Broadcast (taskBinaryBytes) / / task / / radio, according to the type of stage, generate the corresponding ShuffleMapTask/ResultTask val tasks: Seq [Task [_]] = new ShuffleMapTask/new ResultTask / / submit the taskset = > taskScheduler. SubmitTasks (new taskset (...). ) val manager = createTaskSetManager(taskSet, maxTaskFailures) Is decided by schedulableBuilder scheduling order schedulableBuilder. AddTaskSetManager (manager, Manager. The taskSet. Properties) / / call SchedulerBackend reviveOffers method of Task scheduling, decision Task specific operation in which Executor / / the Task scheduling, In the specific operation in which decision task Executor = > schedulerBackend. ReviveOffers () driverEndpoint. Send (reviveOffers) / / send driver scheduling information / / driver end scheduling task executing CoarseGrainedSchedulerBackend. The receive ()caseTask scheduler. ResourceOffers (workOffers) => LaunchTasks () // Send a message to executor and start task foreach Task Executorendpoint.send (LaunchTask())Copy the code

Text description of the code:

When the program of the user calls val sc = new SparkContext(sparkConf), this statement helps the program start many objects related to driver communication, job execution, threads, etc. This statement establishes program driver status.

The Job logical execution diagram is generated

Compute () specifies how to compute the partition in each RDD. Compute () specifies the partition in each RDD. Compute () specifies the partition in each RDD. GetDependencies () defines data dependencies for partitions between RDD.

The physical Job execution diagram is generated

Each action() triggers the creation of a job, and stages are divided at dagScheduler.runjob (). When submitStage() is generated, specific ShuffleMapTasks or ResultTasks contained in this stage are generated, and then tasks are packaged into tasksets and given to the taskScheduler. If the taskSet can run will perform the tasks to CoarseGrainedSchedulerBackend distribution.

Assign the Task

After CoarseGrainedSchedulerBackend receives the taskSet, Elements will be serialized and sent the tasks a worker named by the scheduler node CoarseGrainedExecutorBackend on the Endpoint.

Job receiving

After receiving tasks, the Worker performs the following operations

// The driver sends LaunchTask information to executor
executorEndpoint.send(LaunchTask(serializedTask)
=> executor.launchTask()
    / / Executors newCachedThreadPool, with unbounded queue
=> executor.threadPool.execute(new TaskRunner(context, taskDescription))
Copy the code

Executor wraps a task as a taskRunner and extracts an idle thread from the thread pool to run the task. A CoarseGrainedExecutorBackend process one and only one executor object.

The Task to run

The following figure shows the execution flow of the task after it is assigned to the worker node and how the driver handles the result of the task.

Executor receives a Serialized task, deserializes the normal task, and then runs the task to get the directResult execution result, which is sent back to the driver. However, packets sent through Actor should not be too large. If the result is large (for example, the result of groupByKey), the result should be stored in the local “memory + disk” and managed by blockManager. Only indirectResult is sent to the driver. When the driver needs the actual result, it fetches it via HTTP. If the result is not large (less than spark.akka.framesize = 10MB), send it directly to the driver.

If the task ends with directResult > akka. FrameSize, the directResult is stored on a local “memory + disk” managed by blockManager. The memoryStore in BlockManager opens up a LinkedHashMap to store data to local memory. LinkedHashMap stored data. Total size does not exceed the Runtime getRuntime. MaxMemory * spark. Storage. MemoryFraction (default 0.6). If the LinkedHashMap does not have enough space to store the new data, it sends the data to diskStore to store it on disk, provided that the storageLevel of the data contains disk.

Here the size of the standard directResult transmission has been changed to Math. Min ((” spark. RPC. Message. MaxSize “, 128 m), (” spark. Task. MaxDirectResultSize “, 1 l < < 20))

The memory space management above needs to be revalidated

In TaskRunner.run()
=> coarseGrainedExecutorBackend.statusUpdate(TaskState.RUNNING)
        driverRef.send(StatusUpdate)
=> updateDependencies(addedFiles, addedJars) // Download dependent resources
=> task = ser.deserialize(serializedTask)
=> value = task.run(taskId) / / is called a subclass ShuffleMapTask/ResultTask runTask method
=> directResult = new DirectTaskResult(ser.serialize(value),accumUpdates)   //Accumulator= >if( resultSize > maxResultSize )  // The default maxResultSize is 1G
       //IndirectTaskResult is a reference that stores DirectTaskResult in woker BlockManager
        ser.serialize(new IndirectTaskResult[Any] (TaskResultBlockId(taskId), resultSize))
   else if (resultSize > maxDirectResultSize) {  // If the task returns a result greater than 128 MB (the default RPC transfer message size) < 1 GB
        ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
    }else{
        ser.serialize(directResult)
    }
=> coarseGrainedExecutorBackend.statusUpdate(TaskState.FINISHED,result)
=>      driverRef.send(StatusUpdate,result)
Copy the code

ShuffleMapTask and ResultTask generate different results. ShuffleMapTask generates MapStatus, which contains two items: The BlockManagerId (executorId + host, port, nettyPort) of the BlockManager to which the task belongs, and the size of each FileSegment output by the task. Result generated by ResultTask is the execution result of func on the partition. For example, the func of count() counts the number of records in a partition. Because ShuffleMapTask needs to write FileSegment to disk, it needs to output stream writers, which are generated and controlled by shuffleBlockManager inside blockManger.

In task.run(taskId) // The runTask method of the subclass is called
// if the task is ShuffleMapTask
=> shuffleMapTask.runTask(context)
=> shuffleManager.getWriter.write(rdd.iterator(partition, context))
//MapStatus contains the address where task writes shuffle files= >return MapStatus(blockManager.blockManagerId, Array[compressedSize(fileSegment)])

//If the task is ResultTask, run directly= >return func(context, rdd.iterator(split, context))
Copy the code

When the Driver receives the result of a task execution, it performs a series of actions: first, it tells the taskScheduler that the task has been executed, and then it analyzes the result. Because the result could be indirectResult, need to call first blockManager. GetRemoteBytes () to fech the actual result, this process under section will break down. After obtaining the actual result, it is necessary to analyze the situation. If it is the result of ResultTask, ResultHandler can be used to calculate the result on the driver side (for example, count() will sum all ResultTask results). If result is the MapStatus of ShuffleMapTask, Then MapStatus (location and size information of FileSegment output by ShuffleMapTask) needs to be stored in mapStatuses data structure in mapOutputTrackerMaster for future reducer Shuffle. If the task received by the driver is the last task in the stage, submit the next stage, and if the stage is already the last stage, tell dagScheduler that the job has been completed.

After driver receives StatusUpdate(result)
=> taskSchedulerImpl.statusUpdate(taskId, state, result.value)
//TaskState.isFinished(state) && state == TaskState.FINISHED
=> taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result)
        TaskResultExecutor.execute.(new Runnable().run())
            if result is directResult
                directResult.value(serializer)
            if result is IndirectResult
                serializedTaskResult = blockManager.getRemoteBytes(blockId)
=>          taskSchedulerImpl.handleSuccessfulTask(taskSetManager, tid, result)
                //Marks a task as successful and notifies the DAGScheduler that the task has ended.
=>              taskSetManager.handleSuccessfulTask(tid, taskResult)
                    sched.backend.killTask() // Kill all other similar task attempts
                    // Notify dagScheduler that the task is complete
=>                  dagScheduler.taskEnded(result.value, result.accumUpdates)
                        eventProcessLoop.post(CompletionEvent)  // Start thread processing

dagScheduler.doOnReceive()
    dagScheduler.handleTaskCompletion(completion)
=>      if task Success
            if task is ResultTask
                updateAccumulators(event)
                if (job.numFinished == job.numPartitions) 
                    markStageAsFinished(resultStage)
                    //Removes state for job and any stages that are not needed by any other job
                    cleanupStateForJobAndIndependentStages(job)
                    listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
                job.listener.taskSucceeded(outputId, result)// Notify JobWaiter that a task has succeeded
                    jobWaiter.taskSucceeded(index, result)
                    resultHandler(index, result)

             if task is ShuffleMapTask
                updateAccumulators(event)
                 shuffleStage.pendingPartitions -= task.partitionId
                 shuffleStage.addOutputLoc(smt.partitionId, mapStatus)
                if (all tasks in current stage have finished)
                    mapOutputTracker.registerMapOutputs(shuffleId, Array[MapStatus])
                        mapStatuses.put(shuffleId, Array[MapStatus])
=>              submitWaitingChildStages(stage)
                    waitingStages.filter(_.parents.contains(parent)).foreach.submitStage(_)
        
        // Add other possible scenarios= >if task Resubmitted      
            pendingPartitions += task.partitionId //TaskSetManagers only support ShuffleMapStage Resubmitted= >if task FetchFailed
            if fail times > (spark.stage.maxConsecutiveAttempts,4)
                abortStage(failedStage, abortMessage)
            else new Runnable.run(){eventProcessLoop.post(ResubmitFailedStages)}
            handleExecutorLost(executorId)// Mark Executor lost when there are several fetch failures
                blockManagerMaster.removeExecutor(execId)
                foreach ShuffleMapStage in executor
                    stage.removeOutputsOnExecutor(execId)
                    mapOutputTracker.registerMapOutputs(shuffleId,Array[MapStatus]) = >if task exceptionFailure  
            // This is an accumulator~~
             updateAccumulators(event)
Copy the code

Shuffle read

The previous section described the task operation process and the result processing process. This section describes how reducer (tasks that need shuffle) obtains input data. How reducer processes input data was explained in Shuffle Read in the previous chapter.

Question: How does reducer know where to fetch data?

This information is sent to the driver’s mapOutputTrackerMaster when ShuffleMapTask completes and stored in mapStatuses: HashMap<stageId, Array[MapStatus]>

When the reducer needs to fetch input data, it first calls blockStoreShuffleFetcher to obtain the location of input data (FileSegments). BlockStoreShuffleFetcher performs this task by calling the local MapOutputTrackerWorker, Communicate with mapOutputTrackerMasterActor MapOutputTrackerWorker use mapOutputTrackerMasterActorRef MapStatus information. BlockStoreShuffleFetcher processes the obtained MapStatus information and extracts the information about FileSegment from which nodes the reducer should obtain. This information is stored in blocksByAddress. After blockStoreShuffleFetcher will obtain FileSegment data to basicBlockFetcherIterator.

rdd.iterator()
=> rdd(e.g., ShuffledRDD/CoGroupedRDD).compute()
=> SparkEnv.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser)
=> blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer)
=> statuses = MapOutputTrackerWorker.getServerStatuses(shuffleId, reduceId)

=> blocksByAddress: Seq[(BlockManagerId.Seq[(BlockId.Long)])] = compute(statuses)
=> basicBlockFetcherIterator = blockManager.getMultiple(blocksByAddress, serializer)
=> itr = basicBlockFetcherIterator.flatMap(unpackBlock)
Copy the code

BasicBlockFetcherIterator after receiving the task to get the data, generates fetchRequest one by one, each fetchRequest contains several FileSegments task to a node. The figure shows white FileSegment (FS) that reducer 2 needs to obtain from three worker nodes. The total data fetching task is represented by blocksByAddress, which requires fetching 4 from the first node, 3 from the second node, and 4 from the third node.

To speed up the task acquisition process, it is obvious to divide the total task into subtasks (fetchRequest) and then allocate a thread for each task to fetch. Spark starts 5 parallel fetch threads for each reducer (Hadoop also starts 5 threads by default). Due to fetch the data will be in the buffer memory, so a fetch data cannot too much, Spark set no more than the Spark. Reducer. MaxSizeInFlight = 48 MB. Note that the 48MB space is shared by the five FETCH threads, so try to keep the fetchRequest no larger than 48MB / 5 = 9.6MB when doing the molecular tasks. Node 1, Size(fs0-2) + Size(fs1-2) < 9.6MB Therefore, we need to disconnect at T1-R2 and T2-R2, so we have two fetchRequests going to Node 1 fetch. Will there be any fetchRequests larger than 9.6MB? Of course, if a FileSegment is particularly large, you still need to fetch the FileSegment all at once. In addition, if the reducer needs some FileSegment on this node, then local read directly. Finally, deserialize the fetch FileSegment, provide the records inside to rdd.compute() in the form of iterator, and the whole shuffle read ends.

//Spark shuffle read forShuffledRDD.com pute spark 2 x () BlockStoreShuffleReader. Read () / / / / the fetch data through a message sent to obtain ShuffleMapTask stored data location metadata = > mapOutputTracker.getMapSizesByExecutorId(shuffleId, startPartition, EndPartition) val Statuses = getStatuses(shuffleId) Array[MapStatus] // From driver MapOutputTrackerMasterEndpoint remote access, is actually the other thread, send message by LinkedBlockingQueue val fetchedBytes = AskTracker [Array[Byte]](GetMapOutputStatuses(shuffleId)) // Return format: Seq[BlockManagerId,Seq[(shuffle block id, shuffle block size)]] MapOutputTracker.convertMapStatuses(shuffleId, StartPartition endPartition, statuses) / / set the size of each transmission SparkEnv. Get. Conf. GetSizeAsMb ("spark.reducer.maxSizeInFlight"."48m") * 1024 * 1024 // Maximum number of remote fetch blocks sparkenv.get.conf.getint ("spark.reducer.maxReqsInFlight". Int. J MaxValue) = > new ShuffleBlockFetcherIterator (). The initialize () splitLocalRemoteBlocks () / / division blocks of local and remote val TargetRequestSize = math. Max (maxBytesInFlight / 5, 1L)// Maximum number of bytes per batch of requests, running 5 requests in parallel address.executorId! = blockManager. BlockManagerId. ExecutorId with this blockManagerId. / / if executorId executorId is different, from the remote access RemoteRequests += new FetchRequest(address, FetchUpToMaxBytes ()// Send a remote request to get blocks sendRequest(fetchRequests.dequeue()) // send requests one by one / / request data is too big, will be written to disk shuffleFiles, otherwise don't write = > shuffleClient. FetchBlocks blockIds. ToArray () new OneForOneBlockFetcher (). The start () / / send the fetch request fetchLocalBlocks () / / to get local Blocks foreach block {blockManager. GetBlockData (blockId)} / / if it is shuffle block. Is obtained after a shuffle bolck = > ExternalShuffleBlockResolver. GetSortBasedShuffleBlockData () / / new file () reads the index file file indexFile =  getFile("shuffle_" + shuffleId + "_" + mapId + "_0.index")
                        File data = getFile("shuffle_" + shuffleId + "_" + mapId + "_0.data") / / FileSegmentManagedBuffer (data, offset, length) extracted from / / the indexFile file offset and length, here as long as part of the file data / / read flow channel length bytes specified file through a pipe Diskstore.getbytes (blockId) is read directly from memory or disk if it is not shuffle block FileInputStream(diskManager.getFile(new file(blockId.name))).getChannel() memoryStore.getValues(blockId) Entries.get (blockId) // Directly from LinkedHashMap[blockId, MemoryEntry[_]] get // If deP has Map side merge, replace mergeCombiners with mergeValue => aggregator.combineCombinersByKey(combinedKeyValuesIterator) new ExternalAppendOnlyMap[K, C, C](identity, MergeCombiners, mergeCombiners). InsertAll (iter) EstimatedSize) / / direct operation on the map, the update is a closure, contains mergeValue, currentMap createCombiner function. ChangeValue (curEntry _1, Update) / / if dep to Map the merger, or the need for a single vaule merge = > aggregator.com bineValuesByKey (keyValuesIterator) new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, MergeCombiners).insertall (iter) // Sort the data and write it to the buffer. If the result of sorting exceeds the threshold, => new ExternalSorter().insertall (aggregatedIter) // If we need to sort the keyOrdering //map we need to shouldCombine(Aggregator! = None), a buffer is an array of keys and valuesif (shouldCombine)  foreach record {map.changeValue((getPartition(key), update)}
            elseForeach record{buffer.insert(getPartition(key)) maybeSpillCollection() EstimateSize ()) / / use sampling method to estimate the size of the spill (collection) / / will spill into an ordered collection of memory file val spillFile = spillMemoryIteratorToDisk (in(spill reminders) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, Sorter.stop ()) externalsorter.merge () returns (partition,Iterator) (0 until numPartitions).iterator.map {p => (p, mergeWithAggregation(mergeCombiners)) // Internal mergeSort(p, mergeSort(ordering.get)) Iterators. Iterator. Flatten) / / don't need, returned directlyCopy the code

Let’s talk about some details:

How does reducer send fetchRequest information to a target node? How does the target node process fetchRequest information and read FileSegment and send it back to the reducer?

This part of Spark 2 has been changed and needs to be compared again

RDD. The iterator () calls when they encounter ShuffleDependency BasicBlockFetcherIterator to get FileSegments. BasicBlockFetcherIterator using the connectionManager blockManager fetchRequest connectionManager that was sent to the other nodes. Connect Managers communicate with each other in NIO mode. Other nodes, such as connectionManager on Worker node 2, receive the message and hand it to blockManagerWorker. BlockManagerWorker uses diskStore in blockManager to read FileSegments required by fetchRequest from local disk. Then send the FileSegments back again through connectionManager. If FileConsolidation is used, diskStore also needs the shuffleBlockManager to provide the location of blockId. If FileSegment does not exceed the spark. Storage. MemoryMapThreshold = 8 KB, so diskStore when read FileSegment will directly put FileSegment in memory, otherwise, FileSegment is read using the FileChannel memory-mapping method in RandomAccessFile (so that large FileSegment can be loaded into memory).

When BasicBlockFetcherIterator received other nodes returned serialized FileSegments after will put it in fetchResults: FetchResults: Queue is the equivalent of the softBuffer mentioned in Shuffle Details. * * if BasicBlockFetcherIterator needed some FileSegments locally, will read through the diskStore directly from the local file, and on the fetchResults inside. Finally, Reducer read records from FileSegment while processing.

After the blockManager receives the fetch request

=> connectionManager.receiveMessage(bufferMessage)
=> handleMessage(connectionManagerId, message, connection)

// invoke blockManagerWorker to read the block (FileSegment)
=> blockManagerWorker.onBlockMessageReceive()
=> blockManagerWorker.processBlockMessage(blockMessage)
=> buffer = blockManager.getLocalBytes(blockId)
=> buffer = diskStore.getBytes(blockId)
=> fileSegment = diskManager.getBlockLocation(blockId)
=> shuffleManager.getBlockLocation()
=> if(fileSegment < minMemoryMapBytes)
     buffer = ByteBuffer.allocate(fileSegment)
   else
     channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
Copy the code

Every reducer for holding a BasicBlockFetcherIterator, a 48 MB fetchResults BasicBlockFetcherIterator theory can hold. When one FileSegment is read in fetchResults, many filesegments are fetched until the 48MB is filled.

BasicBlockFetcherIterator.next()
=> result = results.task()
=> while(! fetchRequests.isEmpty && (bytesInFlight ==0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
        sendRequest(fetchRequests.dequeue())
      }
=> result.deserialize()
Copy the code

Discussion

It took me three days to write this chapter, and it was the worst days I had felt all month. Anyway, keep summing up.

In fact, the architecture part has nothing to say. It is designed to be functional independent, module independent and loosely coupled. BlockManager is well designed but manages too many things (blocks, memory, disks, communications).

This chapter mainly discusses how each module in the system works together to complete job generation, submission, operation, result collection, result calculation and shuffle. Posted a lot of code, also drew a lot of pictures, although a lot of details, but far from reaching the source of the degree of detail. If you don’t understand, please read the source code according to the description.

If you want to learn more about blockManager, you can refer to Spark source code analysis -Storage module by Jerry Shao.