0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This article, the sixth in a series, looks at the Horovod background thread architecture.

Because the number of words is limited, so this article is divided into two issued, please forgive me.

The previous links are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

Horovod (5) — fusion framework

0 x01 probe,

Tensor & Operation: In order to do ring-allreduce, the Execution Thread will pass the Tensor & Operation to the background Thread through a series of operations: the Execution Thread on the left is the training Thread and the background Thread on the right is the ring-allreduce Thread.

Let’s take a look at how this works in the background.

0x02 Design essentials

2.1 the problem

Because computing frameworks tend to use multiple threads to perform the trained computation diagram, in the case of multi-node allReduce operations, for example, we cannot guarantee that allReduce requests on each node are ordered. Therefore, MPI_Allreduce cannot be used directly.

2.2 plan

To solve this problem, HVD designed a master-slave mode, where rank 0 is the master node and rank 1 ~ rank n is the worker node.

  • The Master node coordinates synchronously to ensure that allReduce requests for some tensor are finally ordered and complete and can be processed.
  • After deciding which tensor to use, the master will send the name and order of the tensor that can be communicated to each node.
  • When all the nodes have the tensor and order of the MPI to be done, the MPI communication can take place.

First of all, let’s review the concept of synchronous gradient update, which means to wait for all Rank gradients to be calculated, and then do the global gradient accumulation, which involves doing message communication in the cluster, for which HVD does two aspects of work.

  • In Horovod, each card corresponds to a training process called a rank. For example, if there are 4 cards, the corresponding rank of each process is [0,1,2,3].
  • Coordination: In HVD, Rank0 is used as coordinator (master), and the other processes are workers. Rank0 coordinates the progress of all ranks.
  • Background communication thread: In order not to block normal OP calculation, background communication thread is created in HVD, which is used for message synchronization between ranks and AllReduce operation.

In Horovod, the training processes are equal participants, and each process is responsible for both the distribution of the gradient and the specific gradient calculation. As shown in the figure below, the gradient in the three workers is evenly divided into three parts, and the cluster gradient calculation and synchronization can be completed through four communications.

2.3 coordinate

2.3.1 design

For the coordination process, the document also has a very detailed description, I also translated together.

Coordinator currently uses the master-worker paradigm. Rank 0 is the master (the “coordinator”) and the other Rank is the worker. Each rank runs in its own background thread, and the time slice cycle schedules processing. In each time slice, the following operations are performed:

  • The Workers send MPIRequests to the coordinator. MPIRequests explicitly indicate what the worker wants to do (such as what to do on which tensor, gather or reduce, and the shape and type of the tensor). After the ComputeAsync has been executed in the Tensor’s Collective OP, the worker will send an MPIRequest for each tensor.

  • When there are no more tensors to process, the workers will send an empty “DONE” message to the coordinator;

  • After a coordinator receives MPIRequests and its own TensorFlow OPS from the worker, it stores them in the request table. The coordinator continues to receive the MPIRequest until it receives MPI_SIZE “DONE” messages.

  • The Coordinator collects all the tensors that are going to be reduced, gather, or all the operations that cause errors. For every vector or operation. The Coordinator sends MPIResponse to all workers. When there are no more MPIResponses, the Coordinator sends a “done” response to the worker. If the process is shutting down, it will send a shutdown response.

  • Workers listen for MPIResponse messages and perform the required Reduce or Gather operations one by one until they receive “DONE” resposne. At this point, the time slice ends. If the received message is not “DONE” but “SHUTDOWN”, exit the background loop

To put it simply:

  • The Coordinator collects the MPIRequests of all workers (including the Coordinator, who is also training) and places them in the request table.
  • After collecting MPI_SIZE “DONE” messages, the Coordinator will look at the ready tensor (in message_table) and construct a list of read_to_reduce, Then issue the size of MPIResponse to tell the process to compute.
  • The worker receives the response and starts the actual computation (executed through op_Manager).
  • This is the overall synchronization process. If I open the horovod trace log(HOROVOD_LOG_LEVEL=trace) you can see the synchronization process.

2.3.2 implementation

Let’s look at the implementation.

In Horovod, each card corresponds to a training process called a rank. For example, if there are 4 cards, the corresponding rank of each process is [0,1,2,3].

HVD designs a master-slave mode, which uses Rank0 as a coordinator (master) and the rest processes as workers. Rank0 coordinates the progress of all ranks. There is a message queue on each worker node, and a message map on the Master node in addition to a message queue.

Whenever a communication request comes from the computing framework, HVD does not execute the MPI directly, but encapsulates the message and pushes it onto its own message queue.

  • Request and Response mechanisms of messages are adopted as a whole.
  • When the gradient calculation of an OP is complete and waiting for global AllReduce, the Rank will wrap a Request and call ComputeResponseList to send the Request (that is, A ready tensor is a message_queue of this rank, and each rank’s background thread will periodically train its message_queue and send requests from that queue to Rank 0. Because MPI is synchronized, each node blocks until THE MPI completes.
  • Rank 0 has a Message_table, which is used to store the request information of other Rank. Rank 0 processes all requests in the message_table.
  • When rank 0 receives all the rank requests for an op AllReduce, the tensor is ready for all the ranks. This tensor needs and is able to communicate since all nodes have already made a communication request to the tensor.
  • Once the tensor has been decided on, the Master will send the name and order of the tensor that can be communicated to each node.
    • The Rank 0 node will pick out all the correct tensor for MPI communication:
    • Tensor that does not meet the requirements remains in the message map, waiting for the conditions to be met.
    • And when there’s a correct tensor, Rank 0 then it sends a Response to the other Rank indicating that all the local gradients of the current op & tensor are Ready, and we can collective that tensor, For example, you can perform allReduce operations.
  • At this point, all the nodes have the tensor and order of the upcoming MPI and the MPI communication is in order.

The logic is as follows:

           Rank 0                          Rank 1         Rank 2
             +                               +              +
             |                               |              |
             |                               |              |
             |                               |              |
             +            Tensor 1 request   |              |
        message_table  <---------------------+              |
             +                               |              |
             |                               |              |
             |                               |              |
             v                               |              |
                                             |              |
 message_table[tensor 1]                     |              |
             +                               |              |
             |                               |              |
             |           Tensor 1 request    |              |
             | <--------------------------------------------+
             +                               |              |
 message_table[tensor 1, tensor 1]           |              |
             +                               |              |
             |                               |              |
             |           Tensor 1 request    |              |
             +-------------------------+     |              |
             |                         |     |              |
             |                         |     |              |
             | <-----------------------+     |              |
             |                               |              |
             v                               |              |
message_table[tensor 1, tensor 1, tensor 1]  |              |
             +                               |              |
             |                               |              |
             |                               |              |
             |          Tensor 1 response    |              |
             +-----------------------------> |              |
             |                               |              |
             |          Tensor 1 response    |              |
             +--------------------------------------------> |
             |                               |              |
             |  Tensor 1 response            |              |
             +-------------------------v     |              |
             |                         |     |              |
             |                         |     |              |
             | <-----------------------+     |              |
             |                               |              |
             |                               |              |
             v                               v              v

Copy the code

2.4 Background Thread

Each rank has two threads, and we usually initialize HVD using hvd.init() in a Python file, essentially opening a background thread and an MPI thread.

  • The Execution thread (MPI thread) is used to do machine learning calculations.
  • Background threads are used for synchronous communication between ranks and allReduce operations. Baidu has an MPI BackgroundThread in its design, and Horovod follows this design with the name BackgroundThreadLoop.

Against 2.4.1 design

In terms of design thinking, baidu in the source code comments (tensorflow – allreduce – master/tensorflow/contrib/mpi_collectives/mpi_ops. Cc) inside write very clear, I roughly translated.

The MPI Background Thread is for coordinating all MPI processes and tensor reduction. This design is based on several considerations:

  1. Some MPI implementations require that all MPI calls be in a separate thread. Because Tensorflow may use several threads to process diagrams, we must use our own specific thread to process MPI;
  2. For some errors (such as mismatched types), MPI sometimes doesn’t have a definite way to handle them, but we want to handle them gracefully. In order to be elegant, the MPI process needs to know the shape and type of tensor on the other processes;
  3. The MPI reductions and gathers may be processed in parallel with other operations. Because an MPI uses a GPU Stream that is internal and separate from a TF GPUDevice Streams, we cannot inaccessible memcpys or Kernels explicitly. Therefore, MPIAllreduce and MPIAllgather must be AsyncOpKernels to ensure a proper sequence of memcpys or Kernels.
  4. Note: We cannot ensure that all MPI processes reduce their Tensors in the same order. Therefore, there must be a way to ensure that reduction memcpys and Kernels can be performed simultaneously across all ranks. We use rank ID 0 as a coordinator to gather and trigger the reduction operations that are ready to be performed.

Downsizing:

  1. Some MPI implementations require that all MPI calls be in a separate thread.
  2. In order to handle errors, the MPI process needs to know the shape and type of tensor on other processes.
  3. MPIAllreduce and MPIAllgather must be AsyncOpKernels to ensure a proper sequence of memcpys or Kernels.

Therefore, a background thread is necessary. Message_queue and horovod_global.tensor_table are processed in the background thread BackgroundThreadLoop of the Horovod.

2.4.2 implementation

At the bottom level, AllReduce is registered as an Op, and in ComputeAsync, compute requests are queued into a queue. This queue is processed by a unified background thread.

During the initialization of the background thread, it uses the shared global state within the process to create objects in its own memory, as well as some logical decisions. Such as whether to implement Hierarchical AllReduce, AutoTune and so on. Here is the log for initialization.

During initialization, some important objects are constructed, such as various controllers.

Let’s take a closer look at background threads.

0x03 Accessibility Functions

Let’s start with some accessibility features.

3.1 How Can I Determine whether a Coordinator Is a Coordinator

Because the background thread code is shared by all workers, it is necessary to distinguish rank0 from other workers to execute different code flows.

Is_coordinator is used to determine whether it is Rank0.

The assignment for is_Coordinator_ is as follows:

void MPIController::DoInitialization(a) {...// Get MPI rank to determine if we are rank zero.
  MPI_Comm_rank(mpi_ctx_.mpi_comm, &rank_);
  is_coordinator_ = rank_ == 0;
Copy the code

An example of how is_Coordinator_ can be used is as follows: When the parameters are synchronized, they are retrieved from rank 0 and broadcast to another rank, workers:

void Controller::SynchronizeParameters(a) {
  ParameterManager::Params param;
  if (is_coordinator_) { // rank 0 to perform the operation
    param = parameter_manager_.GetParams(a); }void* buffer = (void*)(&param);
  size_t param_size = sizeof(param);
  Bcast(buffer, param_size, 0, Communicator::GLOBAL);

  if(! is_coordinator_) {// worker performs operations
    parameter_manager_.SetParams(param); }}Copy the code

3.2 Coordinating caches and information

In the ComputeResponseList function, the following code is used to coordinate the cache by sorting out the tensor shared by all the ranks.

CoordinateCacheAndState(cache_coordinator);
Copy the code

Primarily, the cache_coordinator operation is used.

void Controller::CoordinateCacheAndState(CacheCoordinator& cache_coordinator) {
  // Sync cache and state information across workers.
  cache_coordinator.sync(shared_from_this(), timeline_enabled_);
}
Copy the code

3.2.1 Calculation there are tensor

The CoordinateCacheAndState function is as follows:

  • Each worker organizes its own bitvector;
  • Use CrossRankBitwiseAnd sort out the common tensor;
  • Use CrossRankBitwiseOr to sort out a common invalid tensor;
void CacheCoordinator::sync(std::shared_ptr<Controller> controller,
                            bool timeline_enabled) {

  // Resize and initialize bit vector.
  int nbits = num_active_bits_ + NUM_STATUS_BITS;
  int count = (nbits + sizeof(long long) * CHAR_BIT - 1)/(sizeof(long long) * CHAR_BIT); .// Each worker organizes its own bitvector
  // For each cache hit on this worker, flip associated bit in bit vector.
  for (auto bit : cache_hits_) {
    int shifted_bit = bit + NUM_STATUS_BITS;
    int shift = shifted_bit / (sizeof(long long) * CHAR_BIT);
    bitvector_[shift] |=
        (1ull << (shifted_bit % (sizeof(long long) * CHAR_BIT)));
    if (timeline_enabled) {
      // Set corresponding bit in extended section for timeline if needed.
      bitvector_[count + shift] ^=
          (1ull << (shifted_bit % (sizeof(long long) * CHAR_BIT))); }}// The tensor
  // Global AND operation to get intersected bit array.
  controller->CrossRankBitwiseAnd(bitvector_, fullcount);

  // Search for flipped bits to populate common cache hit set. There will never
  // be invalid bits in this set.
  cache_hits_.clear(a);for (int i = 0; i < count; ++i) {
    int shift = i * sizeof(long long) * CHAR_BIT;
    long long ll = bitvector_[i];
    while (ll) {
      int idx = __builtin_ffsll(ll);
      int shifted_bit = shift + idx - 1;
      cache_hits_.insert(shifted_bit - NUM_STATUS_BITS);
      ll &= ~(1ull << (idx - 1)); }}...// If any worker has invalid cache entries, communicate invalid bits across
  // workers using a second bit-wise allreduce operation.
  if (invalid_in_queue_) {
    std::memset(&bitvector_[0].0, count * sizeof(long long));
    for (auto bit : invalid_bits_) {
      int shift = bit / (sizeof(long long) * CHAR_BIT);
      bitvector_[shift] |= (1ull << (bit % (sizeof(long long) * CHAR_BIT)));
    }

    // Global OR operation to get common invalid bits.
    controller->CrossRankBitwiseOr(bitvector_, count);
    // Search for flipped bits to populate common invalid bit set.
    invalid_bits_.clear(a);for (int i = 0; i < count; ++i) {
      int shift = i * sizeof(long long) * CHAR_BIT;
      long long ll = bitvector_[i];
      while (ll) {
        int idx = __builtin_ffsll(ll);
        int bit = shift + idx - 1;
        invalid_bits_.insert(bit);
        ll &= ~(1ull << (idx - 1));
      }
    }
  }

  synced_ = true;
}
Copy the code

3.2.2 the MPI operation

CrossRankBitwiseAnd calls MPI to merge the shared bitvector.

void MPIController::CrossRankBitwiseAnd(std::vector<long long>& bitvector,
                                        int count) {
  int ret_code = MPI_Allreduce(MPI_IN_PLACE, bitvector.data(), count,
                               MPI_LONG_LONG_INT, MPI_BAND, mpi_ctx_.mpi_comm);
}
Copy the code

3.3 MPIContext

Mpi_context is created when the C++ code is loaded, and other contexts (nccl_context, gpu_context) are created to maintain the environment information and Settings necessary for mpi communication on some nodes, such as:

  • Three MPI Communicators, MPI_COMM, LOCAL_COMM and Cross_Comm are respectively responsible for horovod MPI transmission, intra-node transmission and inter-node hierarchical transmission (mainly used for hierarchical AllReduce).
  • Mpi_float16_t: HoroVOd transmits mainly in FLOAT16.
  • Mpi_float16_sum: Sum operation corresponding to Float16.

The above communicator is used for data transfer whenever MPI is used in Horovod.

3.4 Parameter_manager

Parameter_manager is a GlobalState manager that manages some parameters that regulate horovod performance. It is initialized with other GlobalState elements in BackgroundThreadLoop. The following environment variables are then read and set.

  • HOROVOD_FUSION_THRESHOLD: Specifies the size of the slice to be transmitted. The default is 64mb. If the slice is too large, you will not have a good pipeline transmission.

  • HOROVOD_CYCLE_TIME: The default is 5ms. The ideal sleep time should be the time it takes for the rest of the RunLoopOnce logic to process + HOROVOD_CYCLE_TIME is exactly equal to the time it takes for a forward propagation and a backward propagation. Because if you sleep too long, the front end is waiting for RunLoopOnce to wake up; If you sleep too short and run a RunLoopOnce over and over again, there will be no new elements in tensor_queue, just a blank run.

  • HOROVOD_CACHE_CAPACITY: indicates the cache size, which may depend on the number of model tiers.

  • HOROVOD_HIERARCHICAL_ALLGATHER: Whether to use a hierarchical allGather approach, etc

Parameter_manager also provides automatic adjustment for these parameters. The parameters are set by parameter_Manager. SetAutoTuning. After the setting, different parameter combinations are attempted for communication in the initial batches, and then convergence is achieved to a set of optimal parameter values.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

This is enough for you to know about The Pytorch distributed training!

Horovod uses _ Distributed model training with HoroVOd

Spark’s new vision: Make deep learning easier to use

Scaling model training in PyTorch using distributed data parallel

Scaling model training in PyTorch using distributed data parallelism

A developer-friendly guide to mixed precision training with PyTorch

Developer-friendly PyTorch hybrid precision training guide

It’s 2020, why isn’t deep learning 100% on the cloud yet?

By 2020, why can’t we have 100% deep learning in the cloud?

Take you through the Horovod Distributed training framework

Using Horovod in Amazon SageMaker pipeline mode to implement multi-GPU distributed training

Kubernetes Training _ Distributed deep learning training using Horovod on Kubernetes

Horovod- Based on the TensorFlow distributed deep learning framework

This article explains the Tensorflow distributed training requirements

Horovod source analysis

Horovod Source Code Analysis (part 1)

MPI, OpenMPI and deep learning

Horovod communication strategy