0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This article, the seventh in a series, looks at how Horovod fits into TensorFlow.

The previous links are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

Horovod (5) — fusion framework

Horovod (6) — background architecture

Horovod (6), a distributed training framework for deep learning, is implemented using threads

We need some questions or design points to guide the analysis, and since readers may not have seen other articles in this series, the questions will be partially repeated:

  • The first technical difficulty is: how does Horovod obtain the gradients from the TF’s execution flow for processing?

    • In TensorFlow 1.x, deep learning computations are represented as a graph and interpreted and executed by the TensorFlow Runtime, so Horovod gets the computations gradient for each process and AllReduce them. You have to hack into the TF diagram execution process to get the gradient.
  • The second technical difficulty is: Horovod can define AllReduce operations by itself, but how can its AllReduce operations be embedded in TF’s processing process?

    • Because the Horovod custom HVD Operation is independent of the TF OP and cannot be directly inserted into the TF Graph for execution, there needs to be a way to register the HVD OP with the TF OP.

0x01 Background Concepts

Let’s recall the background concept.

1.1 Deep learning framework

The core problem of deep learning training is to fit F () through reverse gradient calculation, whose purpose is to calculate the gradient and update parameters. And the way to calculate the gradient is basically by taking the derivative of the chain. The first derivative of the chain is just one forward and one backward. The key process of model training is forward propagation and back propagation.

Taking a simple deep neural network as an example, in order to complete the optimization of loss, we divide the data into batch and continuously feed the data into the model network for the following iterative process, so as to achieve convergence of the final optimization network:

  • A batch of data is sent to the network for forward propagation, which is a series of matrix + activation function combined operations.
  • After the predicted value of forward propagation output is compared with the true value label, the loss function is used to calculate the loss of this iteration.
  • The loss is propagated back to each layer in the neural network model for reverse gradient calculation, and the weight matrix and BIAS of each layer are updated.

One of the core problems that deep learning frameworks help us solve is the computation and updating of gradientations during back propagation. Without a deep learning framework, we would need to write our own methods to do complex gradient calculations and updates.

1.2 Tensorflow Optimizer

The underlying structure of Tensorflow is a computational diagram composed of tensors. A computation graph is the underlying programming system. Each computation is a node in the graph, and the dependencies between the computations are represented by the edges between the nodes. The computational diagrams form the structural basis for the forward/back propagation.

Given a computational graph, TensorFlow uses automatic differentiation (back propagation) to perform the gradient operation. Tf. Train. Optimizer allow us to minimize () function, automatically weights updated, the tf. Train. Optimizer. Minimize () to do two things:

  • Calculate the gradient. Compute_gradients (loss, var_list…) Calculates the gradient of loss to the specified val_list, returning the list of tupleslist(zip(grads, var_list)).
  • The calculated gradient is used to update the corresponding weights. Call apply_gradients(grads_and_vars, global_step=global_step, name=None) and compute_gradients (loss, var_list… The return value of is used as input to update the weight variable;

The reason to divide () into two steps is that the gradient can be modified in a way that prevents the gradient from disappearing or exploding.

Tensorflow also allows the user to calculate the gradient themselves. After the user has done the intermediate processing, the gradient is applied to the weights for updating, which is broken down into the following three steps:

  • Tf.train.Optimizer.com pute_gradients is used to calculate the gradient;
  • The gradient is customized by the user. This is actually where Horovod can fiddle;
  • For the gradient calculated by the user, tf.train.Optimizer.apply_gradients is used to update the weight.

0x02 Overall Architecture

2.1 General Ideas

Each process of the Horovod job calls the single-machine TensorFlow to do the local calculation, then collects the gradient, and uses AllReduce to aggregate the gradient and update the model in each process.

Horovod needs to intercept the gradient from TensorFlow.

  • TensorFlow 1.x
    • In TensorFlow 1.x, a deep learning computation is a computation graph that the TensorFlow runtime is responsible for interpreting and executing.
    • Horovod must dive into the graph execution process in order to get the computed gradient of each process and be able to AllReduce them. To this end, Horovod completes AllReduce operations on gradients by encapsulating and combining the user Optimizer, i.e., Horovod requires developers to use Horovod’s own hvd.distributedOptimizer instead of the official TensorFlow optimizer so that the gradient can be obtained during the optimization model phase.
  • TensorFlow 2.0
    • TensorFlow 2.0’s Eager Execution pattern takes a completely different approach to computation. In the forward calculation process, the call to the basic cell (operator) is recorded in a data structure tape. In the reverse calculation process, the tape can be backtracked to call the gradient operator corresponding to the operator. Tape provides an operation that allows the user to get the gradient of each parameter.
    • Horovod calls the TensorFlow 2.0 API to retrieve the gradient directly. Horovod then completes the AllReduce call by encapsulating the tape.

3.2 Overall call relationship

Let’s first give the overall calling relationship: HVD.DistributedOptimizer inherits keras Optimizer, Hvd.distributedoptimizer then passes the gradient to hvd.allreduce(gradients,…) in its overloaded get_gradients. To achieve the gradient collective merge of the entire Horovod cluster.

The logic for calculating the gradient is:

  • TF calls compute_gradients method on hvd.distributedOptimizer:
    • HVD.DistributedOptimizer first calculates the local gradient using TF’s official optimizer.compute_gradients;
    • Then AllReduce is used to get the average gradient of each process.
    • Compute_gradients returns a list of (gradients, weights) pairs. Used by apply_gradients;
  • TF calls the apply_gradients method of hvd.distributedOptimizer:
    • Call TF’s official optimizer.apply_gradients to process the passed arguments and return an OP for the updated weight. The TF can use this return value for subsequent processing;

Because of the version problem of TF, we differentiated 1.x and 2.x for analysis.

0x04 TensorFlow 1.x

As mentioned earlier, Horovod requires developers to use Horovod’s own hvd.DistributedOptimizer instead of the official TensorFlow optimizer, so that the gradient can be obtained during the optimization model phase. So we analyze from _DistributedOptimizer.

4.1 _DistributedOptimizer

Take horovod/tensorflow/__init__.py as an example.

try:
    # TensorFlow 2.x
    _LegacyOptimizer = tf.compat.v1.train.Optimizer
except AttributeError:
    try:
        # TensorFlow 1.x
        _LegacyOptimizer = tf.train.Optimizer
    except AttributeError:
        # Future TensorFlow versions
        _LegacyOptimizer = None
Copy the code

As you can see, for TensorFlow 1.x, the base we will use later is _LegacyOptimizer.

_DistributedOptimizer inherits from _LegacyOptimizer. It encapsulates another TF.Optimizer, which uses allReduce operations to collect and average the gradient before applying it to the model. The wrapped TF. Optimizer is the official TF optimizer specified by the user at the time of use.

Specifically, you can recall how the user used:

# TF Official Optimizer
opt = tf.optimizers.Adam(scaled_lr)

# Wrap the regular TensorFlow Optimizer with Horovod and use Ring-AllReduce to get the average gradient
opt = hvd.DistributedOptimizer(
    opt, backward_passes_per_step=1, average_aggregated_gradients=True)

# The last model is hvd.distributedOptimizer
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
                    optimizer=opt, metrics=['accuracy'],
                    experimental_run_tf_function=False)
Copy the code

The optimizer that opt is passed to DistributedOptimizer is assigned to self._optimizer in the constructor __init__.py.

if _LegacyOptimizer is not None:
    class _DistributedOptimizer(_LegacyOptimizer) :
        """An optimizer that wraps another tf.Optimizer, using an allreduce to combine gradient values before applying gradients to model weights."""

        def __init__(self, optimizer, name=None, use_locking=False, device_dense=' ',
                    device_sparse=' ', compression=Compression.none,
                    sparse_as_dense=False, op=Average, gradient_predivide_factor=1.0,
                    backward_passes_per_step=1, average_aggregated_gradients=False,
                    groups=None) :

            self._optimizer = optimizer # is assigned to self._optimizer in the constructor
            self._allreduce_grads = _make_allreduce_grads_fn( # Set merge function
                name, device_dense, device_sparse, compression, sparse_as_dense, op,
                gradient_predivide_factor, groups)

            self._agg_helper = None
            if backward_passes_per_step > 1:
                You can do local gradient accumulation first and then praise process merge
                self._agg_helper = LocalGradientAggregationHelper( 
                    backward_passes_per_step=backward_passes_per_step,
                    allreduce_func=self._allreduce_grads,
                    sparse_as_dense=sparse_as_dense,
                    average_aggregated_gradients=average_aggregated_gradients,
                    rank=rank(),
                    optimizer_type=LocalGradientAggregationHelper._OPTIMIZER_TYPE_LEGACY,
                )
Copy the code

4.2 compute_gradients

The first step in calculating gradients is to call compute_gradients to compute loss against the gradient specified by val_list, returning the list of tuples (zip(grads, var_list)).

Every worker’s tensor model calls compute_gradients, and for each model,

Gradients = self._optimizer.compute_gradients(*args, **kwargs) is the gradient computed locally by this model.

DistributedOptimizer overrides the compute_gradients() method of the Optimizer class.

  • _DistributedOptimizerThere is a configuration during initializationself._allreduce_grads = _make_allreduce_grads_fn. This is important.
  • The compute_gradients() method first calls the compute_gradients() of the original configuration TF official optimizer. Compute_gradients () returns a list of primitives, each element of which is (gradient, variable), where gradient is the gradient of each variable change;
  • If set _agg_helper, namely LocalGradientAggregationHelper, call LocalGradientAggregationHelper for local gradient accumulation (local cumulative goal is to reduce the number of across processes, Cross-process merging will only take place when certain stages are reached), otherwise call _allreduce_grads, i.e., directly cross-process merging (allreduce the computed distributed gradient with MPI);
        def compute_gradients(self, *args, **kwargs) :
            """Compute gradients of all trainable variables. See Optimizer.compute_gradients() for more info. In DistributedOptimizer, compute_gradients() is overriden to also allreduce the gradients before returning them. """
            
            # _optimizer is the official optimizer for the original configuration, and its compute_gradients method is first called to calculate the gradients for all the training parameters
            The compute_gradients() method of the # official optimizer returns a list of tuples (gradients, variables)
            # gradients is assigned to this list of tuples (gradients, variables)
            gradients = self._optimizer.compute_gradients(*args, **kwargs)
            grads, vars = zip(*gradients)
            
            if self._agg_helper: # Whether to accumulate locally first
                avg_grads = self._agg_helper.compute_gradients(grads, vars)
            else:
                avg_grads = self._allreduce_grads(grads, vars)
            return list(zip(avg_grads, vars))
Copy the code

The logic is as follows:

+-----------------------------+ |_DistributedOptimizer | | | | | +---------------+ | self._optimizer +----------------> | tf.Optimizer | | | | | | | +---------------+ | | | | +-------------------------+ | _allreduce_grads +---------------->  |_make_allreduce_grads_fn | | | +-------------------------+ | | | | | | | | | | +-------------------------------------------------+ | compute_gradients +-------------> |compute_gradients | | | | | +-----------------------------+ | | | _optimizer.compute_gradients | | + | | | | | | | | v | | _agg_helper.compute_gradients(grads, vars) | | | | _allreduce_grads(grads, vars) | | + | | | | | | | | v | | list(zip(avg_grads, vars)) | | | +-------------------------------------------------+Copy the code

4.3 LocalGradientAggregationHelper

Mentioned earlier, if set _agg_helper, namely LocalGradientAggregationHelper, call LocalGradientAggregationHelper accumulated to the local gradient (local accumulation also will undertake across processes after merger). So we talk about LocalGradientAggregationHelper.

LocalGradientAggregationHelper will be updated in the local gradient, but because when initialization, member function. The self _allreduce_grads = allreduce_func is across processes allreduce function. So the LocalGradientAggregationHelper allreduce will make cross process. That is, each backward_passes_per_step is updated across machines.

Note here: Allreduce_func = self _allreduce_grads, Actually LocalGradientAggregationHelper internal call self. _allreduce_grads is also called the _make_allreduce_grads_fn.

LocalGradientAggregationHelper(
                        backward_passes_per_step=backward_passes_per_step,
                        allreduce_func=self._allreduce_grads, # is _make_allreduce_grads_fn
                        sparse_as_dense=sparse_as_dense,
                        average_aggregated_gradients=average_aggregated_gradients,
                        rank=rank(),
                        optimizer_type=LocalGradientAggregationHelper._OPTIMIZER_TYPE_KERAS,
                    )
Copy the code

Concrete is to call the LocalGradientAggregationHelper.com pute_gradients completion, including:

  • The _init_aggregation_VARS function traverses a list of local tuples (gradients, variables), accumulated in locally_aggregated_grads.
  • Allreduce_grads will do an operation that goes through the tensor & applies the tensor, and for each tensor, the _allreduce_grads_Helper function will be combined across processes.

4.3.1 _init_aggregation_vars

The _init_aggregation_VARS function traverses a list of local tuples (gradients, variables), accumulated in locally_aggregated_grads.

def _init_aggregation_vars(self, grads) :
    """ Initializes the counter that is used when to communicate and aggregate gradients and the tensorflow variables that store the locally aggregated gradients. """
    variable_scope_name = "aggregation_variables_" + str(self.rank)
    with tf.compat.v1.variable_scope(variable_scope_name, reuse=tf.compat.v1.AUTO_REUSE):
        self.counter = tf.compat.v1.get_variable(
            "aggregation_counter", shape=(), dtype=tf.int32,
            trainable=False, initializer=tf.compat.v1.zeros_initializer(),
            collections=[tf.compat.v1.GraphKeys.LOCAL_VARIABLES],
        )
        Traversing the local gradient
        for idx, grad in enumerate(grads):
            # Handle IndexedSlices.
            # If IndexedSlices, converted to tensors
            if self.sparse_as_dense and isinstance(grad, tf.IndexedSlices):
                grad = tf.convert_to_tensor(grad)
            elif isinstance(grad, tf.IndexedSlices):
                raise ValueError(
                    "IndexedSlices are not supported when "
                    "`backward_passes_per_step` > 1 and "
                    "`sparse_as_dense` is False."
                )

            # Handle grads that are None.
            # If empty, skip
            if grad is None:
                self.num_none_grad_updates += 1
                continue
            self.not_none_indexes[idx] = len(self.locally_aggregated_grads)

            # Create shadow variable.
            grad_aggregation_variable_name = str(idx)
            zero_grad = tf.zeros(shape=grad.get_shape().as_list(), dtype=grad.dtype)
            grad_aggregation_variable = tf.compat.v1.get_variable(
                grad_aggregation_variable_name,
                trainable=False,
                initializer=zero_grad,
                collections=[
                    tf.compat.v1.GraphKeys.LOCAL_VARIABLES,
                    "aggregating_collection"],)# added to the local cumulative variable locally_aggregated_grads
            self.locally_aggregated_grads.append(grad_aggregation_variable)
        assert len(self.locally_aggregated_grads) + \
            self.num_none_grad_updates == len(grads)

    # We expect to get a `sess` when we need to manually do a `sess.run(...) `
    # for the variables to be initialized. This is the `tf.keras`
    # optimizers.
    # Traverse the variables of locally_aggregated_grads, initializing if required
    if self.optimizer_type == self._OPTIMIZER_TYPE_KERAS:
        session = tf.compat.v1.keras.backend.get_session(op_input_list=())
        vars_init_op = tf.compat.v1.variables_initializer(
            [self.counter, *get_not_none_from_list(self.locally_aggregated_grads)]
        )
        session.run(vars_init_op)
Copy the code

4.3.2 compute_gradients

Compute_gradients method is as follows:

    def compute_gradients(self, grads, vars) :
        """ Applies the new gradient updates the locally aggregated gradients, and performs cross-machine communication every backward_passes_per_step times it is called. """
        # Traverse a list of local tuples (gradient, variable), cumulative in locally_aggregated_grads
        self._init_aggregation_vars(grads)

        # Clear the locally aggregated gradients when the counter is at zero.
        If the counter is 0, clean up the local cumulative gradient
        clear_op = tf.cond(
            pred=tf.equal(self.counter, 0),
            true_fn=lambda: self._clear_grads(),
            false_fn=tf.no_op
        )

        # Add new gradients to the locally aggregated gradients.
        # Local cumulative gradient
        with tf.control_dependencies([clear_op]):
            aggregation_ops_list = self._aggregate_grads(grads)

        # Increment the counter once new gradients have been applied.
        Once the local gradient has been applied, increments the counter by 1
        aggregation_ops = tf.group(*aggregation_ops_list)
        with tf.control_dependencies([aggregation_ops]):
            update_counter = self.counter.assign_add(tf.constant(1))

        # Apply gradient
        with tf.control_dependencies([update_counter]):
            grads = get_not_none_from_list(grads)
            assert len(grads) == len(self.locally_aggregated_grads)

            # Allreduce locally aggregated gradients when the counter is equivalent to
            # `backward_passes_per_step`. This the condition is true, it also resets
            # the counter back to 0.
            allreduced_grads = tf.cond(
                tf.equal(self.counter, self.backward_passes_per_step), # Determine whether allReduce can be used
                lambda: self._allreduce_grads_helper(grads, vars), If counter is equal to backward_passes_per_step, apply cross-process AllReduce to the locally accumulated gradient
                lambda: grads, Otherwise, directly assign the input gradient
            )

            # Handle case where there is only one variable.
            if not isinstance(allreduced_grads, (list.tuple)):
                allreduced_grads = (allreduced_grads,)

            # Insert gradients that are None back in.
            # For locally cumulative gradients, undertake cross-process merging, locally_aggregated_grads is locally cumulative
            allreduced_grads = [
                allreduced_grads[self.not_none_indexes[idx]] if idx in self.not_none_indexes else None
                for idx in range(len(self.locally_aggregated_grads) + self.num_none_grad_updates)
            ]

        # If gradients have not been allreduced this batch, we return the gradients
        # that were submitted as the updates (the input).
        return allreduced_grads Return the gradient after merging across processes/or the original input gradient
Copy the code

The logic expands as follows, but note here that either _agg_helper or _allreduce_grads select one to execute:

  • If set _agg_helper, namely LocalGradientAggregationHelper, call _agg_helper to calculate gradient (local accumulation also will undertake across processes after merger);
  • Otherwise, call _allreduce_grads, i.e., _make_allreduce_grads_fn computing, i.e., merging across processes (using MPI to do allreduce operations on the computed distributed gradient);
 +-----------------------------+
 |_DistributedOptimizer        |                                                                   +-----------------------------------------------------+
 |                             |                                                                   | LocalGradientAggregationHelper                      |
 |                             |       +---------------+                                           |                                                     |
 | self._optimizer  +----------------> | tf.Optimizer  |                                           |    +---------------------------------------------+  |
 |                             |       |               |                                           |    | compute_gradients                           |  |
 |                             |       +---------------+                                           |    |                                             |  |
 |                             |                                                                   |    |                                             |  |
 |                             |       +------------------------------------------------------+    |    |         _init_aggregation_vars              |  |
 | compute_gradients  +------------->  |compute_gradients                                     |    |    |                    +                        |  |
 |                             |       |                                                      |    |    |                    |                        |  |
 |                             |       |                                                      |    |    |                    |                        |  |
 |                             |       |      _optimizer.compute_gradients                    |    |    |                    v                        |  |
 | _allreduce_grads            |       |                +                                     |    |    |                                             |  |
 |      +                      |       |                |                                     |    |    |        _allreduce_grads_helper              |  |
 |      |                      |       |                |                                     |    |    |                    +                        |  |
 +-----------------------------+       |                v                                     |    |    |                    |                        |  |
        |                              |      _agg_helper.compute_gradients(grads, vars) +------------> |                    |                        |  |
        |                              |                                                      |    |    |                    v                        |  |
        |                   +--------------+  _allreduce_grads(grads, vars) | | | allreduced_grads | | | | | + | | | | | | | | | | | +---------------------------------------------+ | | | | | | |  | | | | v | | allreduce_func | | | |list(zip(avg_grads, vars+)) | | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + +-----------------------------------------------------+ | | | | | | v v | +-------+-------------------+--------+ | |_make_allreduce_grads_fn | | | | <-----------------------------------------------------------------------+ | _allreduce_cond | | | | | | | +------------------------------------+Copy the code

The details are as follows:

4.4 _make_allreduce_grads_fn

_make_allreduce_grads_fn calls the _make_cacheD_allreduce_grads_fn completion function.

def _make_allreduce_grads_fn(name, device_dense, device_sparse, compression, sparse_as_dense, op, gradient_predivide_factor, groups) :
    groups = vars_to_refs(groups) if isinstance(groups, list) else groups
    return _make_cached_allreduce_grads_fn(name, device_dense, device_sparse,
                                           compression, sparse_as_dense, op,
                                           gradient_predivide_factor, groups)

Copy the code

_make_cached_allreduce_grads_fn does:

  • Get all grads;
  • Iterate over the list of tuples (gradient, variable). For each grad, use _allreduce_cond to synchronize with other workers;
  • Finally, a list of the synchronized gradients is returned;
@_cache
def _make_cached_allreduce_grads_fn(name, device_dense, device_sparse, compression, sparse_as_dense, op, gradient_predivide_factor, groups) :
    groups = refs_to_vars(groups) if isinstance(groups, tuple) else groups
    ......
    def allreduce_grads(grads, vars=None) :
        # Set the namespace
        with tf.name_scope(name + "_Allreduce") :...Retrieve all grads
            ",(grad1,var1)..." , there may be a lot of None, so extract the var whose grad is not None and calculate the gradient.
            return [_allreduce_cond(grad,
                                    device_dense=device_dense,
                                    device_sparse=device_sparse,
                                    compression=compression,
                                    op=op,
                                    prescale_factor=prescale_factor,
                                    postscale_factor=postscale_factor)
                    if grad is not None else grad
                    for grad in grads]

    if _executing_eagerly():
        return _make_subgraph(allreduce_grads)
    else:
        return allreduce_grads
Copy the code

In the _allreduce_cond function, allReduce is called for collection communication operations.

def _allreduce_cond(tensor, *args, **kwargs) :
    def allreduce_fn() :
        return allreduce(tensor, *args, **kwargs)

    def id_fn() :
        return tensor

    return tf.cond((size_op() > 1) if int(os.environ.get("HOROVOD_ELASTIC".0)) else tf.convert_to_tensor(size() > 1),
                   allreduce_fn, id_fn)
Copy the code

4.5 allreduce

In the allReduce () method, different processing is done depending on whether the type of Tensor you need to transfer is Tensor or IndexedSlices.

  • If tensor is IndexedSlices, then you only need to do allGather, and whether you need anything else depends on the additional configuration.
    • Because values and indices do not duplicate each other for IndexedSlices distributed on different workers.
    • Suppose the indices distributed on worker 1 is [1, 3, 5, 7, 9] and the indices distributed on worker 2 is [2, 4, 6, 8, 10]. All you need to do is use allgather to get [1,2,3,4,5,6,7,8,9,10]. No summing/averaging is required.
    • If there are additional operations, further processing is required.
  • Tensor, then you need to call the _allReduce method: take the sum of the tensors and then average them.
def allreduce(tensor, average=None, device_dense=' ', device_sparse=' ',
              compression=Compression.none, op=None,
              prescale_factor=1.0, postscale_factor=1.0,
              name=None) :
    """Perform an allreduce on a tf.Tensor or tf.IndexedSlices. """
    op = handle_average_backwards_compatibility(op, average)

    if isinstance(tensor, tf.IndexedSlices): # For type IndexedSlices
        # TODO: Need to fix this to actuall call Adasum
        if op == Adasum:
        with tf.device(device_sparse):
            # For IndexedSlices, do two allgathers instead of an allreduce.
            # Do two allgathers
            horovod_size = tf.cast(size_op() if int(os.environ.get("HOROVOD_ELASTIC".0)) else size(),
                                   dtype=tensor.values.dtype)
            values = allgather(tensor.values) # One allgeathers deals with value
            indices = allgather(tensor.indices) # An AllGather to process the index

            # To make this operation into an average, divide allgathered values by
            # the Horovod size.
			      If op is Average, then calculate the Average of all values, otherwise do not operate
            new_values = (values / horovod_size) if op == Average else values
        return tf.IndexedSlices(new_values, indices,
                                dense_shape=tensor.dense_shape)
    else: Tensor
        average_in_framework = False
        if rocm_built():
            # For ROCm, perform averaging at framework level
            average_in_framework = op == Average or op == Adasum
            op = Sum if op == Average else op

        with tf.device(device_dense):
            # First, convert the size_op() result to tensor's dType
            horovod_size = tf.cast(size_op() if int(os.environ.get("HOROVOD_ELASTIC".0)) else size(),
                                   dtype=tensor.dtype)
            tensor_compressed, ctx = compression.compress(tensor)
            # defines a sum/ compression operation that sums a tensor with the same name as all other tensors in the Horovod process
            summed_tensor_compressed = _allreduce(tensor_compressed, op=op,
                                                  prescale_factor=prescale_factor,
                                                  postscale_factor=postscale_factor,
                                                  name=name)
            summed_tensor = compression.decompress(summed_tensor_compressed, ctx)
            if op == Adasum: # Handle other additional operations
                if 'CPU' not in tensor.device and gpu_available('tensorflow') :if nccl_built():
                        if not is_homogeneous:
                        elif not check_num_rank_power_of_2(int(size() / local_size())):
                        if rocm_built():
                            horovod_local_size = tf.cast(local_size_op() if int(os.environ.get("HOROVOD_ELASTIC".0)) else local_size(),
                                                         dtype=tensor.dtype)
                            new_tensor = summed_tensor / horovod_local_size
                        else:
                            new_tensor = summed_tensor
                    else:
                        new_tensor = summed_tensor
                else:
                    new_tensor = summed_tensor
            else:
                if rocm_built():
                    new_tensor = (summed_tensor / horovod_size) if average_in_framework else summed_tensor
                else:
                    new_tensor = summed_tensor
        return new_tensor
Copy the code

4.6 _allreduce

The _allReduce and allGather methods are in horovod.tensorflow.mpi_ops.py.

The HorovodAllreduceOp and HorovodAllgatherOp methods are HVD custom OP related to TensorFlow. _allReduce and allGather correspond to them, respectively.

  • _allReduce is bound with the name “HorovodAllreduce” and HorovodAllreduceOp. Mpi_lib. horovod_allReduce does the intermediate conversion.
  • Allgather uses the name “HorovodAllgather” and is bound to HorovodAllgatherOp. Mpi_lib. horovod_AllGather does the intermediate conversion.

This calls the corresponding MPI operation.

Combined with the previous configuration for the namespace in _make_cached_allreduce_grads_fn, the tensor name is roughly: DistributedAdam_Allreduce/cond_14/HorovodAllreduce_grads_5_0.

def _allreduce(tensor, name=None, op=Sum, prescale_factor=1.0, postscale_factor=1.0,
               ignore_name_scope=False) :
    """An op which reduces an input tensor over all the Horovod processes. The default reduction is a sum. The reduction operation is keyed by the name of the op. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor. Returns: A tensor of the same shape and type as `tensor`, summed across all processes. """
    if name is None and not _executing_eagerly():
        name = 'HorovodAllreduce_%s' % _normalize_name(tensor.name)
    return MPI_LIB.horovod_allreduce(tensor, name=name, reduce_op=op,
                                     prescale_factor=prescale_factor,
                                     postscale_factor=postscale_factor,
                                     ignore_name_scope=ignore_name_scope)
  
def allgather(tensor, name=None, ignore_name_scope=False) :
    """An op which concatenates the input tensor with the same input tensor on all other Horovod processes. The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different. Returns: A tensor of the same type as `tensor`, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Horovod processes. """
    if name is None and not _executing_eagerly():
        name = 'HorovodAllgather_%s' % _normalize_name(tensor.name)
    return MPI_LIB.horovod_allgather(tensor, name=name,
                                     ignore_name_scope=ignore_name_scope)  
Copy the code

4.7 Mapping Operations

In the Python world, _allReduce is called with several arguments, such as tensor and name. Where op=Sum is the most important. This is used internally by C++ to determine the reduction action. Let’s go through the details:

4.7.1 c + + definitions

In C++ there are:

enum ReduceOp {
    AVERAGE = 0.// This value should never appear past framework code, as
                 // averaging is taken care of there.
    SUM = 1,
    ADASUM = 2
};

int horovod_reduce_op_sum(a) {
  return ReduceOp::SUM;
}
Copy the code

4.7.2 Obtaining The Python configuration

In the Python initialization code:

class HorovodBasics(object) :
    """Wrapper class for the basic Horovod API."""

    def __init__(self, pkg_path, *args) :
        full_path = util.get_extension_full_path(pkg_path, *args)
        self.MPI_LIB_CTYPES = ctypes.CDLL(full_path, mode=ctypes.RTLD_GLOBAL)

        self.Average = self.MPI_LIB_CTYPES.horovod_reduce_op_average()
        self.Sum = self.MPI_LIB_CTYPES.horovod_reduce_op_sum() # connect here
        self.Adasum = self.MPI_LIB_CTYPES.horovod_reduce_op_adasum()
Copy the code

Thus, the default parameter of _allreduce is op=Sum, which corresponds to the ReduceOp:: Sum of C++.

4.7.3 Establishing contacts

_allReduce continues to call:

MPI_LIB.horovod_allreduce(tensor, name=name, reduce_op=op
Copy the code

Mpi_lib. horovod_allreduce is translated into the following code in the C++ world

  • First, reduce_op_ can be obtained from the configuration of OP_REQUIRES_OK.
  • Secondly, the reduce_op_ in ComputeAsync can determine the specific operation that needs to be called;

Thus, Python and the C++ world are further linked.

class HorovodAllreduceOp : public AsyncOpKernel {
public:
  explicit HorovodAllreduceOp(OpKernelConstruction* context)
      : AsyncOpKernel(context) {
    // We'll declare that we get the reduce_op from the context and assign it to the reduce_op_
    OP_REQUIRES_OK(context, context->GetAttr("reduce_op", &reduce_op_));
    // Omit irrelevant code
  }

  void ComputeAsync(OpKernelContext* context, DoneCallback done) override {
    OP_REQUIRES_OK_ASYNC(context, ConvertStatus(common::CheckInitialized()),
                         done);
    // Omit irrelevant code
    // the reduce_op_ is used to determine what operations C++ calls internally
    horovod::common::ReduceOp reduce_op = static_cast<horovod::common::ReduceOp>(reduce_op_);
    // Omit irrelevant code
  }
Copy the code

4.8 Expansion Process

We extend the current flow chart as follows:

 +-----------------------------+
 |_DistributedOptimizer        |                                                                   +-----------------------------------------------------+
 |                             |                                                                   | LocalGradientAggregationHelper                      |
 |                             |       +---------------+                                           |                                                     |
 | self._optimizer  +----------------> | tf.Optimizer  |                                           |    +---------------------------------------------+  |
 |                             |       |               |                                           |    | compute_gradients                           |  |
 |                             |       +---------------+                                           |    |                                             |  |
 |                             |                                                                   |    |                                             |  |
 |                             |       +------------------------------------------------------+    |    |         _init_aggregation_vars              |  |
 | compute_gradients  +------------->  |compute_gradients                                     |    |    |                    +                        |  |
 |                             |       |                                                      |    |    |                    |                        |  |
 |                             |       |                                                      |    |    |                    |                        |  |
 |                             |       |      _optimizer.compute_gradients                    |    |    |                    v                        |  |
 | _allreduce_grads            |       |                +                                     |    |    |                                             |  |
 |      +                      |       |                |                                     |    |    |        _allreduce_grads_helper              |  |
 |      |                      |       |                |                                     |    |    |                    +                        |  |
 +-----------------------------+       |                v                                     |    |    |                    |                        |  |
        |                              |      _agg_helper.compute_gradients(grads, vars) +------------> |                    |                        |  |
        |                              |                                                      |    |    |                    v                        |  |
        |                   +--------------+  _allreduce_grads(grads, vars) | | | allreduced_grads | | | | | + | | | | | | | | | | | +---------------------------------------------+ | | | | | | |  | | | | v | | allreduce_func | | | |list(zip(avg_grads, vars+)) | | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + +-----------------------------------------------------+ | | | | | | v v | +-------+-------------------+--------+ | |_make_allreduce_grads_fn | | | | <-----------------------------------------------------------------------+ | | | | +-----------------+ +----------------+ +----------------------------+ | _allreduce_cond +-------------------> | allreduce | | _allreduce | | MPI_LIB.horovod_allreduce | | | | +----------------> | +---------------> | | +------------------------------------+ | | | | | | | | | | | | +-----------------+ +----------------+ +----------------------------+Copy the code

Mobile phones are as follows:

0x05 Tensorflow 2.x

5.1 Horovod implementation

For Tf2.x, each line of code is executed sequentially, no diagram is needed, and control_dependency is removed. Horovod can get the gradient quite directly by calling the TensorFlow 2.0 API. So Horovod gradient update part of the implementation is not based on the realization of the calculation chart, but use HVD. DistributedGradientTape.

Worker does the following during training:

  • Use DistributedGradientTape to encapsulate the official Tape of the TF and configure the AllReduce function.
  • Read a set of training data.
  • The forward propagation function is called in the local model to calculate the loss.
  • Given the loss, the worker uses the GradientTape mechanism of TensorFlow eager execution to call the base class function to get the gradient.
  • Each Worker calls Allreduce to synchronize the gradient.
  • Each Worker will update the model according to the latest gradient.

5.2 Example Code

First, we give the following example code, with some non-critical code omitted below, see the comment:

# Horovod: initialize Horovod.
hvd.init() # Initialize HVD

# Horovod: pin GPU to be used to process local rank (one GPU per process)
# configuration GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Load data
(mnist_images, mnist_labels), _ = \
    tf.keras.datasets.mnist.load_data(path='mnist-%d.npz' % hvd.rank())

# Slice the data into features
dataset = tf.data.Dataset.from_tensor_slices(
    (tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
             tf.cast(mnist_labels, tf.int64))
)
# Scramble data and load in batches
dataset = dataset.repeat().shuffle(10000).batch(128)

mnist_model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32[3.3], activation='relu'),
    tf.keras.layers.Conv2D(64[3.3], activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2.2)),
    tf.keras.layers.Dropout(0.25),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(10, activation='softmax')])# Loss function
loss = tf.losses.SparseCategoricalCrossentropy()

# Horovod: adjust learning rate based on number of GPUs.
opt = tf.optimizers.Adam(0.001 * hvd.size())

@tf.function
def training_step(images, labels, first_batch) :
    with tf.GradientTape() as tape:
        probs = mnist_model(images, training=True)
        loss_value = loss(labels, probs)

    # Horovod: add Horovod Distributed GradientTape.
    Call DistributedGradientTape to configure the allReduce function
    tape = hvd.DistributedGradientTape(tape)

    The gradient is explicitly obtained, which, after a series of internal operations, calls the allReduce operation of horovod, eventually the mpi_lib. horovod_allReduce function
    grads = tape.gradient(loss_value, mnist_model.trainable_variables)
    # Apply the gradient and update the weights
    opt.apply_gradients(zip(grads, mnist_model.trainable_variables))

    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    #
    # Note: broadcast should be done after the first gradient step to ensure optimizer
    # initialization.
    # Broadcast variables
    if first_batch:
        hvd.broadcast_variables(mnist_model.variables, root_rank=0)
        hvd.broadcast_variables(opt.variables(), root_rank=0)

    return loss_value


# Horovod: adjust number of steps based on number of GPUs.
for batch, (images, labels) in enumerate(dataset.take(10000 // hvd.size())):
    loss_value = training_step(images, labels, batch == 0)
Copy the code

5.3 _DistributedGradientTape

The key class _DistributedGradientTape is defined as follows:

class _DistributedGradientTape(tf.GradientTape) :
    def __init__(self, tape, device_dense, device_sparse, compression, sparse_as_dense, op,
                 gradient_predivide_factor, groups, persistent=False,
                 watch_accessed_variables=True) :
        if hasattr(tape, '_watch_accessed_variables') :super(self.__class__, self).__init__(persistent, watch_accessed_variables)
        else:
            super(self.__class__, self).__init__(persistent)

        Save the TF official tape
        self._tape = tape
        Configure allReduce function
        self._allreduce_grads = _make_allreduce_grads_fn(
            'DistributedGradientTape', device_dense, device_sparse, compression,
            sparse_as_dense, op, gradient_predivide_factor, groups)

    # The user calls this function explicitly, and it is handled internally using _make_allreduce_grads_fn
    def gradient(self, target, sources, output_gradients=None) :
        Call the base class function to get the gradient
        gradients = super(self.__class__, self).gradient(target, sources, output_gradients)
        return self._allreduce_grads(gradients, sources)
Copy the code

The _make_allreduce_grads_fn function will make a series of calls and finally call mpi_lib. horovod_allReduce. The specific work is as follows:

  • Change name scope to add a suffix_Allreduce;
  • If configured, compression is performed.
  • On the op type, call AllReduce or just return tensor;
  • The name scope of DistributedGradientTape is rewritten to DistributedGradientTape_Allreduce, with the prefix HorovodAllreduce_.
  • Call the mpi_lib. horovod_allReduce function;
@_cache
def _make_allreduce_grads_fn(name, device_dense, device_sparse, compression, sparse_as_dense, op) :
    def allreduce_grads(grads) :
        with tf.name_scope(name + "_Allreduce") :# Modify name scope, add suffix
            if sparse_as_dense:
                grads = [tf.convert_to_tensor(grad) # compression
                         if grad is not None and isinstance(grad, tf.IndexedSlices)
                         else grad for grad in grads]

            return [_allreduce_cond(grad,
                                    device_dense=device_dense,
                                    device_sparse=device_sparse,
                                    compression=compression,
                                    op=op)
                    if grad is not None else grad
                    for grad in grads]

def _allreduce_cond(tensor, *args, **kwargs) :
    def allreduce_fn() :
        return allreduce(tensor, *args, **kwargs)

    def id_fn() :
        return tensor

    return tf.cond(size_op() > 1, allreduce_fn, id_fn) # Call methods that are not used

def _allreduce(tensor, name=None, op=Sum) :
    """An op which reduces an input tensor over all the Horovod processes. The default reduction is a sum. The reduction operation is keyed by the name of the op. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor. Returns: A tensor of the same shape and type as `tensor`, summed across all processes. """
    if name is None and not _executing_eagerly():
        name = 'HorovodAllreduce_%s' % _normalize_name(tensor.name)
    Call HorovodAllreduceOp
    return MPI_LIB.horovod_allreduce(tensor, name=name, reduce_op=op) 
Copy the code

The logic is as follows:

+-------------------------------+ | _DistributedGradientTape | +------------------------------------+ | | |_make_allreduce_grads_fn | | | | | | _allreduce_grads +---------------> | | | | | | | | | _allreduce_cond +---------+ |  | | | | +-------------------------------+ +------------------------------------+ | | | +--------------------------------------------------------------------------+ | | | | | +----------------+ +----------------------------+ | | _allreduce | | MPI_LIB.horovod_allreduce | +-------> | +---------------> | | | | | | | | | | +----------------+ +----------------------------+Copy the code

0x06 HorovodAllreduceOp

HorovodAllreduceOp is what mpi_lib. horovod_AllReduce calls. Mpi_lib. horovod_allreduce is a python function, and HorovodAllreduceOp is a C++ code. Here TF makes an adaptation and transformation, so that we can call a C++ function directly from a python function.

HorovodAllreduceOp inherits AsyncOpKernel, is a TF Async OP, and is REGISTER_KERNEL_BUILDER registered with TF, so it can be embedded in TF processes.

TF will call the ComputeAsync method overwritten by HorovodAllreduceOp. In ComputeAsync, the Allreduce operation of the tensor will be added to the Horovod background queue, thus connecting TF OP and Horovod OP.

In summary, the HorovodAllreduceOp inherits from the TF AsyncOpKernel and can therefore be embedded into a TF process while being combined with the Horovod background thread.

class HorovodAllreduceOp : public AsyncOpKernel { // is derived, so it can be embedded in a TF process
public:
  explicit HorovodAllreduceOp(OpKernelConstruction* context)
      : AsyncOpKernel(context) {
    OP_REQUIRES_OK(context, context->GetAttr("reduce_op", &reduce_op_));
    OP_REQUIRES_OK(context, context->GetAttr("prescale_factor", &prescale_factor_));
    OP_REQUIRES_OK(context, context->GetAttr("postscale_factor", &postscale_factor_));
    OP_REQUIRES_OK(context, context->GetAttr("ignore_name_scope", &ignore_name_scope_));
  }

  void ComputeAsync(OpKernelContext* context, DoneCallback done) override {
    OP_REQUIRES_OK_ASYNC(context, ConvertStatus(common::CheckInitialized()), done); .// Omit some variable validation and initialize the code
          
    // Queue the Allreduce operation OP of the tensor
    auto enqueue_result = EnqueueTensorAllreduce(
        hvd_context, hvd_tensor, hvd_output, ready_event, node_name, device,
        [context, done](const common::Status& status) {
          context->SetStatus(ConvertStatus(status));
          done(a); }, reduce_op, (double) prescale_factor_, (double) postscale_factor_);
    OP_REQUIRES_OK_ASYNC(context, ConvertStatus(enqueue_result), done);
  }

private:
  int reduce_op_;
  // Using float since TF does not support double OP attributes
  float prescale_factor_;
  float postscale_factor_;
  bool ignore_name_scope_;
};
Copy the code

Start with Horovod on Spark.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

0 XFF reference

tf.train.SyncReplicasOptimizer no synchronization among workers #11753

Synchronous distributed tensorflow training doesn’t synchronize among workers #9596

tf.train.SyncReplicasOptimizer

Optimizer in Tensorflow

Slow and Stale Gradients Can Win the Race: Error-Runtime Trade-offs in Distributed SGD

The MPI tutorial

MPI Forum

MPI, OpenMPI and deep learning

When Spark encounters TensorFlow distributed deep learning framework principles and practices

Optimizer in Tensorflow

TensorFlowOnSpark source parsing

TensorFlow SyncReplicasOptimizer interpretation

TensorFlow’s weight update method

Various gradient processing gradients in Tensorflow

Blog.csdn.net/edward_zcl/…

Horovod implementation analysis

Horovod source analysis

Tf.GradientTape in detail: gradient solution

TensorFlow Learning (iv) : GradientTape, Optimizer and Loss function

The ElasticDL deep learning framework simplifies programming and improves cluster utilization and development efficiency

4: AdamOptimizer

【TensorFlow】 optimizer AdamOptimizer source analysis

Tensorflow Optimizer Source code reading notes