This series of articles is based on JerryLead’s SparkInternals. This article is based on the author’s own understanding, annotations, and some source code for learning purposes. After comparison, it is found that the core part has not changed much and is still worth reference

Broadcast

Broadcast, as the name implies, is to send data from one node to other nodes. For example, if a driver has a table and tasks running on other nodes need to lookup the table, the driver can copy the table to these nodes so that the task can query the table locally. How to implement a reliable and efficient broadcast mechanism is a challenging problem. Take a look at this quote from Spark’s website:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Question: Why can only broadcast read-only variables?

This raises the question of consistency. If a variable can be updated, should all other nodes update it once it is updated by one node? If multiple nodes are being updated at the same time, in what order? How do I synchronize? It’s also about fault-tolerance. To avoid data consistency problems, Spark supports only broadcast read-only variables.

Question: Broadcast to nodes instead of broadcast to each task?

Because each task is a thread, and running tasks in the same process belongs to the same application. So one copy per executor can be shared by all tasks.

Q: How to use broadcast?

Driver program example:

val data = List(1.2.3.4.5.6)
val bdata = sc.broadcast(data)

val rdd = sc.parallelize(1 to 6.2)
val observedSizes = rdd.map(_ => bdata.value.size)
Copy the code

The driver uses sc.broadcast() to declare the data to be broadcast. The type of bdata is broadcast.

When RDD.Transformation (func) requires bdata, it is directly invoked in func, such as bdata.value.size for map() in the example above.

Question: How to implement broadcast?

Broadcast has an interesting implementation mechanism:

1. Distribute the bdata meta information before distributing tasks

The Driver creates a local folder to store the data that requires broadcast and starts an HttpServer that can access the folder. If val bdata = sc.broadcast(data) is called, the data is written to the folder, and the data is also written to the driver’s blockManger (StorageLevel = memory + disk) to obtain a blockId. The type is BroadcastBlockId.

//initialize
sparkSession.build()#env.broadcastManager.initialize()
    new TorrentBroadcastFactory.initialize()

//use broadcast
sc.broadcast()
    broadcastManager.newBroadcast()
        //Divide the object into multiple blocks and put those blocks in the block manager.
        new TorrentBroadcast[T](value_, nextBroadcastId.getAndIncrement()).writeBlocks()
            // Save a copy to the driver
            SparkEnv.get.blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)
                doPutIterator()#memoryStore.putIteratorAsValues()#diskStore.put(blockId)
            / / to 4 m, respectively, to save block (" spark. Broadcast. BlockSize ", "4 m"), and get the meta
            block MetaDatas = TorrentBroadcast.blockifyObject(value, blockSize..)
            foreach block MetaData : 
                blockManager.putBytes(BroadcastBlockId.MEMORY_AND_DISK_SER...). doPutBytes()#memoryStore.putIteratorAsValues()#diskStore.putBytes()// Asynchronously replicate data. Sc.broadcast () should keep only one copy of data on the driver side. Replication =1, and the number of broadcast copies should be slowly increased when executorFetch data is available
                    if level.replication > 1 :ThreadUtils.awaitReady(replicate(ByteBufferBlockData(bytes, false(...)// Copy the copy rules for reference
blockManager.replicate()
    // Request the ids of other BlockManagers
    val initialPeers = getPeers(false)
        blockManagerMaster.getPeers(blockManagerId).sortBy(_.hashCode)
            // Get other nodes from the driver
            driverEndpoint.askSync[Seq[BlockManagerId]] (GetPeers(blockManagerId))
                / / BlockManagerMasterEndpoint blockManagerId returned to the driver and the current node
                blockManagerInfo.keySet.contains(blockManagerId)#blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
             foreach block replicate replication- 1 nodes: blockTransferService.uploadBlockSync()
                // Send information to blockManager and save data to notify driver
                blockManager.putBytes()#reportBlockStatus(blockId, putBlockStatus)
                    blockManagerMasterEndpoint.updateBlockInfo() // Updates information on the driver
Copy the code

When rDD.transformation (func) is called, if func uses Bdata, driver submitTask() will serialize bdata with Func to get serialized task. Note that data contained in bData is not serialized during serialization.

//TorrentBroadcast.scala does not serialize data contained in bData
// @TRANSIENT indicates that _value is not serialized
 @transient private lazy val _value: T = readBroadcastBlock()
  /** Used by the JVM when serializing this object. */
  private def writeObject(out: ObjectOutputStream) :Unit = Utils.tryOrIOException {
    assertValid()
    out.defaultWriteObject()
  }
Copy the code

In the previous chapter, when serialized tasks are transferred from driverEndPoint to executor, RPC message transmission mechanism is used. Messages should not be too large, but actual data may be large, so broadcast data cannot be used at this time.

Why does the driver put data into both disk and blockManager? I put it on disk for HttpServer to access, and I put it on blockManager for driver Program to use BData easily (in fact, I don’t want to put it on blockManger).

So when do you send real data? When executor deserializes a task, it also deserializes the BData object in the task, which calls the readObject() method of BData. This method first goes to the local blockManager to ask whether the data of BData is in the blockManager. If not, it uses one of the following two fetch methods to fetch the data. After getting the data, store it in the blockManager, so that tasks running later do not need to fetch data if they need BData. If it’s there, you can use it.

//runjob() dagScheduler.submitMissingTasks(stage: Stage, jobId: Int) val taskIdToLocations = getPreferredLocs(stage.rdd, (id)----- getCacheLocs()// Get the RDD location from the local or driver. Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match {case stage: ShuffleMapStage =>
          JavaUtils.bufferToArray(
            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
        casestage: ResultStage => // Serialize func, which contains the broadcast variable // Does not serialize the data contained in the broadcast variable JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)} taskBinary = sc. Broadcast (taskBinaryBytes) / / radio task taskScheduler. SubmitTasks (new TaskSet (...). )...Copy the code
//TorrentBroadcast.scala
// With lazy, the actual deserialization uses _value before calling the method to read the value
 @transient private lazy val _value: T = readBroadcastBlock()
 TorrentBroadcast.readBroadcastBlock()
     blockManager.getLocalValues()// Read locally
        memoryStore.getValues(blockId)#diskStore.getBytes(blockId)
     readBlocks() // Read from driver/ other executor if there is no local one
        foreach block : 
            blockManager.getRemoteBytes(BroadcastBlockId(id, "piece" + pid))
            blockManager.putBytes()// Save it locally
    // The entire broadcast is saved locally
    blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)
    blocks.foreach(_.dispose()) // delete blocks that were previously stored separately
Copy the code

There are two ways to implement broadcast Data:

2. HttpBroadcast

The Spark 2.2 Broadcast package has removed HttpBroadcast, leaving only TorrentBroadcast

As the name implies, HttpBroadcast is the HTTP protocol that each executor connects to and fetches data from the driver.

The Driver prepares the data to broadcast, and after calling sc.broadcast(data) calls the factory method to create an HttpBroadcast object. The first thing this object does is to store data in the blockManager of the driver, StorageLevel as memory + disk, and blockId as BroadcastBlockId.

The driver also writes broadcast data to the local disk. Such as writing after get the/var/folders/spark – 6233/87 / grpn1_fn4xq5wdqmxk31v0l00000gp/T a4d c72 b09c – 3-4-832 – b – 6 c0791d0eb9c/broadcast_0, This folder serves as the HttpServer file directory.

Httpbroadcast.initialize () is called. The Driver creates a local directory for storing broadcast data. And start the httpServer that can access the directory.

Fetch Data: When executor deserializes a task, it also deserializes the BData object in the task, and calls the readObject() method of BData. This method first goes to the local blockManager and asks whether the data of BData is in the blockManager. If not, it uses HTTP protocol to connect to the httpServer on the driver and fetch the data. After getting the data, store it in the blockManager, so that tasks running later do not need to fetch data if they need BData. If it’s there, you can use it.

The biggest problem with HttpBroadcast is that the node on which the driver resides can be congested because all executors on the worker fetch data from the driver.

3. TorrentBroadcast

To solve the problem of driver single point network bottleneck in HttpBroadast, Spark designs a broadcast method called TorrentBroadcast, which is similar to the commonly used BitTorrent technology. The basic idea is to partition data into data blocks, and then if one executor fetches some data blocks, that executor can be treated as a data server. As the number of executors in the fetch increases and more data servers are added, data can quickly be propagated to all executors.

HttpBroadcast transmits data using the traditional HTTP protocol and httpServer. Use is introduced in the last chapter in TorrentBroadcast blockManager. GetRemoteValues () = > NIO ShuffleClient data transfer method to transmit, The process of reading the data is similar to that of reading the cached RDD, as shown in the last figure in CacheAndCheckpoint.

Here are some details about TorrentBroadcast:

The driver end:

Driver data serialization to byteArray containing first, then cut into BLOCK_SIZE (by spark. Broadcast. BlockSize = 4 MB) the size of the data block, Each data block is held by a TorrentBlock object. After the byteArray is cut, it is reclaimed, so memory consumption can reach 2 * Size(data), but this is temporary.

After the block cutting is complete, the block information (called meta information) is stored in the blockManager of the driver. The StorageLevel is memory + disk. It also notifies the driver of its blockManagerMaster that the meta information has been stored. Informing blockManagerMaster is important because blockManagerMaster is accessible to the driver and all executors, and information stored in blockManagerMaster becomes global information.

Then, each data block is stored in the blockManager of the driver. The StorageLevel is memory + disk. BlockManagerMaster is still notified that blocks is stored. At this point, the driver is done.

The Executor end:

After receiving the Serialized task, executor deserializes the task first. In this case, the Bdata type contained in the Serialized task is TorrentBroadcast. Go to torrentBroadcast._value and call its readBroadcastBlock() method. This method first gets the bData object, and then finds that the bData does not contain the actual data. How to do? ** The local executor blockManager is asked if it contains data (by querying broadcastId for data), and if it does, it reads data directly from the local blockManager. Otherwise, the local blockManager accesses the blockManagerMaster connected to the driver to obtain meta information about the data blocks. After obtaining the information, the BT process starts.

* * BT process: Val blocks = new Array[BlockData](numBlocks) = new Array[BlockData](numBlocks); For (pid < -random.shuffle (seq.range (0, numBlocks))) For example, if there are five data blocks in total, the shuffled fetch order may be 3-1-2-4-5. Then fetch each data block in the scrambled order. ** Every time a block is fetched, it is stored in the Executor’s blockManager, and the blockManagerMaster on the driver is notified that the data block has an extra storage address. ** This step is very important. It means that the blockManagerMaster knows how many copies of the data block there are in the cluster, and has two options when the next task on a different node fetch the data block. And one will be randomly selected to fetch. This process continues with the BitTorrent protocol, and as more and more clients download, more and more Data Block servers, it becomes a P2P download. Wikipedia has an animation of the BT protocol.

After the fetch process is complete, the task will open a large Array[Byte], which is the total size of data, copy all data blocks to the Array, and deserialize the bytes in the Array to obtain the original data. This process is the reverse of how the driver serializes data.

Finally, the data is stored in the blockManager of the executor where the task resides. The StorageLevel is memory + disk. Obviously, there are two blocks of data stored in blockManager, but when all executors are fetched, the block blocks block can be deleted.

Question: What about broadcast RDD?

@Andrew-xia replied: Nothing, just one copy of the RDD instantiated in each executor.

Discussion

Broadcast for public data is a very useful feature. Using DistributedCache in Hadoop, For example, the common -libjars use DistributedCache to distribute task-dependent jars to the working directory of each task. Before distributing DistributedCache, upload the file to the HDFS. The main problem with this approach is waste of resources. If four Mappers from the same job are running on a node, there will be four copies of the common data on that node (one for each task’s working directory). However, the advantage of broadcast through HDFS is that the single point of bottleneck is not obvious, because the common data is divided into multiple blocks, and different blocks are stored on different nodes. In this way, as long as all tasks do not fetch the same block at the same node at the same time, the network congestion is not serious.

For Spark, broadcast takes into account not only how to distribute common data, but also how to share data with tasks on the same node.

For the first question, Spark designs two broadcast modes: traditional HttpBroadcast, which has a single point of bottleneck, and TorrentBroadcast, which is similar to BT. HttpBroadcast uses the traditional client-server HttpServer to transmit real data, while TorrentBroadcast uses the NIO communication provided by blockManager to transmit data. The problem of TorrentBroadcast is slow start and memory consumption. Slow start means that data is only available on the driver at the beginning, and data server becomes significant only after executors fetch many rounds of data blocks. The fetch later becomes faster. Executor memory consumption for deserialization after fetching data blocks is nearly twice the data size. Either way, the driver consumes twice the data size of memory when chunking.

For the second question, each executor contains a blockManager to manage the data stored in the executor. The common data is stored in the blockManager (memory + disk). Tasks executed at executor are guaranteed to share data.

Spark also used the Scalability of Broadcast in The Scalability report Performance and Scalability of Broadcast in Spark.

Further, broadcast can be done using multicast protocols, but multicast using UDP is not reliable and still requires the design of some reliability guarantee mechanism at the application layer.