shuffle

Shuffle refers to data reorganization, which is not unique to Spark. For details about the shuffle processing process in MapReduce, see MapReduce Shuffle. However, the processing of Shuffle in Spark is more complex, but the core of which is how to land data after processing in one task and how to obtain data for processing in another task.

spark shuffle

Mapreduce divides tasks into Map tasks and Reduce tasks. Shuffle is defined as operations performed after the Map function and before the Reduce function. In Spark, tasks are divided into stages for execution according to ShuffleDependence. Shuffle can be simply understood as the process of landing the data after the processing of the previous stage and pulling the result data from the next stage.

In Spark, shuffle is classified into shuffle write and shuffle read. Together, the two are the complete shuffle process:

ShuffleWriter

Disk ShuffleWriter is responsible for the data to be born, in the heart of the spark ShuffleWriter is divided into three types: UnsafeShuffleWriter, BypassMergeSortShuffleWriter and SortShuffleWriter.

Spark is stage splited according to ShuffleDependency. ShuffleDependency will be new in the action operator, which contains the ShuffleHandle attribute. While ShuffleHandle divided into BypassMergeSortShuffleHandle, SerializedShuffleHandle and BaseShuffleHandle, respectively with three ShuffleWriter one-to-one correspondence.

The process for obtaining ShuffleWriter is as follows:

The main code for org. Apache. Spark. Shuffle. Sort. SortShuffleManager. RegisterShuffle () :

  /** * Obtains a [[ShuffleHandle]] to pass to tasks. */
  override def registerShuffle[K.V.C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K.V.C) :ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K.V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K.V.V]])}else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K.V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K.V.V]])}else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }
Copy the code

And org. Apache. Spark. Shuffle. Sort. SortShuffleManager. GetWriter () :

  /** Get a writer for a given partition. Called on executors by map tasks. */
  override def getWriter[K.V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext) :ShuffleWriter[K.V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked.V @unchecked] = >new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked.V @unchecked] = >new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked.V @unchecked, _] = >new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    }
  }
Copy the code

BypassMergeSortShuffleWriter

ShuffleWariter by ShuffleHandle pattern matching, BypassMergeSortShuffleWriter corresponding is BypassMergeSortShuffleHandle, While BypassMergeSortShuffleHandle need to satisfy two conditions:

  1. There is no mapSideCombine, such as groupByKey.
  2. The number of downstream stage tasks is less than or equal to 200 (the number of stage tasks is determined by the number of partitions in the final RDD)spark.shuffle.sort.bypassMergeThresholdControl, not configured by default 200.

BypassMergeSortShuffleWriter logic is relatively simple, it will create a temporary file for each partition in the partition of the data, all files will eventually merge into a single file, and record an index file, in the index file record start offset of the partition and partition data bytes.

UnsafeShuffleWriter

The ShuffleHandle corresponding to UnsafeShuffleWriter is SerializedShuffleHandle. SerializedShuffleHandle must meet the following three conditions:

  1. The Serializer for ShuffleDependency supports addressable serial numbers, i.esupportsRelocationOfSerializedObjects;
  2. Without defining aggregators, reduceByKey does not work either (no Map-side aggregation but reduce-side aggregation)
  3. The number of downstream stage partitions is less than1 < < 24namely16777215

UnsafeShuffleWriter writes data directly to in-heap or out-of-heap memory via the JDK’s sun.misc.Unsafe, which is controlled by a state called tungstenMemoryMode in the source code. To use off-heap memory, three conditions must be met:

  1. Open off-heap memory fromspark.memory.offHeap.enabledControl, which defaults to false;
  2. Out-of-heap memory is greater than zero,spark.memory.offHeap.size;
  3. The environment architecture for running Spark is Linuxppc64leandppc64Things like that, and^(i[3-6]86|x86(_64)? |x64|amd64|aarch64)$.

So by default UnsafeShuffleWriter uses in-heap memory for data storage. UnsafeShuffleWriter stores data in the form of memory pages. The size of memory pages is complicated to calculate:

The above calculation again after calculation Math. Min (PackedRecordPointer MAXIMUM_PAGE_SIZE_BYTES, memoryManager. PageSizeBytes (), MAXIMUM_PAGE_SIZE_BYTES is 128 MB. Normally, the pageSize size is 64 MB, which can also be controlled using spark.buffer.pageSize. Overwrite occurs when memory exceeds a certain limit:

The overall data writing process of UnsafeShuffleWriter is as follows:

For example, memory page applications are eventually applied to TaskMemoryManager. Some of the spark memory management knowledge will be skipped here. The general process steps are as follows:

  1. Get a piece of data in UnsafeShuffleWriter and calculate the hash value based on the key;

  2. Write key-value data in bytes to a serBuffer and save it as recordBase. The default size of the buffer is 1 MB.

  3. ShuffleExternalSorter writes data to the memory page, which is written slightly differently in and out of the heap depending on the tungsten plan:

    1. In the heap, data is stored directly in byte arrays. Since UnSafe is a direct manipulation of memory, the 16-byte object header must be skipped before being written.
    2. Write the recordBase data first, then write the recordBase data, each write will move the pageCursor to the latest position;
    3. Out-of-heap and in-heap write logic is the same, except that the data is written directly to the out-of-heap memory without object headers.
  4. After data is written to a MemoryBlock, index information is recorded, including partition number and record address:

    1. The partition number is long and moved 40 places to the left, so the maximum size of the downstream partition is 2^24;

    2. The data record address contains the number and offset of the memory page where the data resides.

    3. When data is overwritten, index information is sorted to ensure order between partitions.

SortShuffleWriter

The corresponding ShuffleHandle of SortShuffleWriter is BaseShuffleHandle. If the first two conditions are not met, SortShuffleWriter is used. SortShuffleWriter is divided into two cases according to whether there is an aggregator. The overall processing flow is as follows:

ShouldCombine (defines aggregators) :

  1. inAppendOnlyMapThe default size of the array is 64 * 2. It can store 64 key-value pairs and use this array as a map.
  2. When data is written, the partition number is generated according to the key, and the tuple2 object composed of the partition number and key is formed into a new key: k = (partition, key);
  3. throughrehash(k.hashCode) & maskTo obtain the offset (pos), mask isArray key-value pair capacity -1.Key = array[pos], value = array[pos+1];
  4. If the array on pos is null, the data is being processed for the first time. Call the aggregator’s first record handler:newValue = updateFunc(false, null.asInstanceOf[V])And write value to pos+1.
  5. If the data on pos is not empty and is the same as the current key, call the aggregator’s subsequent record handler:newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])And update the new value to pos+1;
  6. If the pos data is not empty but different from the current key (hash collision), linear detection is used to solve the hash collision.
  7. When the array capacity (key-value pair) exceeds 70% of the current capacity, the capacity expansion coefficient is 2. After the capacity expansion, the key is rehash.

! ShouldCombine (no aggregator defined) :

  1. PartitionedPairBuffer defines a reference type array data, the default size of the array is 64 * 2, can store 64 key-value pairs;
  2. When data is written, the partition number is generated according to the key, and the tuple2 object composed of the partition number and key is formed into a new key: k = (partition, key);
  3. Then add key-value pair to data, each record is automatically +1;
  4. When the capacity of data is full, expand the capacity of data, and the expansion coefficient is 2System.arraycopyExpansion mode.

When the application is less than the memory or data is greater than the memory threshold myMemoryThreshold data, the data will be written, myMemoryThreshold initial value by a spark. Shuffle. Spill. InitialMemoryThreshold control, The default value is 5M, which will change during calculation:

At the end, temporary files overwritten are sorted into one file to ensure order between partitions.

ShuffleReader

Spark has only one ShuffleReader, that is, BlockStoreShuffleReader, which handles data pulling in all cases. Blockstore Reader divides the data to be pulled into remote data and local data. The remote data is pulled back to the local data through NettyBlockTransferServer and encapsulated into iterators, which are then converted into different iterators based on various conditions. The overall process is as follows:

  1. In BlockStoreShuffleReader will first go to generate an iterator ShuffleBlockFetcherIterator;

  2. ShuffleBlockFetcherIterator will according to the data of executorId contrast is the same current executorId distinguish between data and on the far side of the local data, then the remote data according to certain size package into remoteRequests, By default, 48M/5 data is pulled per request;

  3. Then, the NettyBlockTransferServer service is used to pull data from the remote end to the local end. The data of each block is divided into N requests. The advantage is that part of the data can be pulled from different remote ends to make full use of the local bandwidth.

  4. LinkedBlockingQueue[FetchResult];

  5. Generated after ShuffleBlockFetcherIterator sparkEnv according to all kinds of information, such as whether to define aggregator, will define key sorting device such as an iterator to the final call iterator to the task.

  6. Data in the form of an iterator nested in the spark for processing, the former RDD iteration time is called back compute method to get into the ShuffleReader iterator pull data processing, such as org. Apache. Spark. RddShuffledRDD:

      override def compute(split: Partition, context: TaskContext) :Iterator[(K.C)] = {
        val dep = dependencies.head.asInstanceOf[ShuffleDependency[K.V.C]]
        // Get getReader and call read
        SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
          .read()
          .asInstanceOf[Iterator[(K.C)]]}Copy the code