0 x00 the

In this article, we introduce PyTorch to investigate data parallelizations, such as allel, gradient, etc.

Other articles in this series are as follows:

Automatic Differentiation of Deep Learning Tools (1)

Automatic Differentiation of Deep Learning Tools (2)

Automatic differentiation of Deep Learning Tools (3) — Interpretation of examples

PyTorch implements forward propagation (1) — Base class (1)

PyTorch implements forward propagation (2) — Base class (2)

PyTorch how to implement forward propagation (3) – implementation

How to implement back propagation (1)—- call engine

Pytorch how to implement backward propagation (2)—- engine static structure

Pytorch how to implement backward propagation (3)—- engine dynamic logic

PyTorch how to implement backward propagation (4)—- specific algorithm

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

Note: This article has used the following two articles in depth. Thank you very much.

Distributed data parallel training using Pytorch on AWS

DP & DDP for PyTorch source Code Interpretation: Model parallel and Distributed training parsing

0 x01 review

Let’s first look at DataParallel from various angles.

1.1 From the perspective of process

Dataparbatches workflows by loading the entire minibatch of data onto the main thread and then distributing uB-Minibatches of data across the ENTIRE GPU network.

  1. The miniBatch data is transferred from Page-locked memory to GPU 0 (Master). The master GPU also holds the model, and other Gpus hold stale copies of the model.

  2. Scatter the miniBatch data between GPUs. Specifically, the data input from a minibatch is divided into multiple parts and sent to the corresponding GPU for calculation.

  3. Copy models between GPUs. All data associated with the Module is also copied multiple times.

  4. Running forward propagation on each GPU computes the output. PyTorch uses multiple threads to propagate forward in parallel, with each GPU performing forward calculations independently and in parallel for its own input data in a separate thread.

  5. Gather the output on the Master GPU and calculate the loss. That is, the loss function value is calculated by comparing the network output with the real data label for each element in the batch.

  6. Loss is scattered between GPUs, backward propagation is run on each GPU, and parameter gradient is calculated.

  7. Merge gradients on GPU 0.

  8. Update gradient parameters.

    • Perform gradient descent and update model parameters on the main GPU.
    • Since model parameters are only updated on the master GPU, while other slave Gpus are not updated synchronously, the updated model parameters need to be copied to the remaining slave Gpus to achieve parallelism.

1.2 From a mode perspective

Let’s start with a technical overview, from a pattern perspective:

  • DP can be thought of as an application similar to parameter server.
  • DDP can be thought of as an application of collective communication.

Parameter servers can be roughly divided into master and worker, while DP is based on single machine and multiple cards, so the corresponding relationship is as follows:

  • Worker: All Gpus (including GPU 0) are workers and are responsible for computing and training the network.
  • Master: GPU 0 is also responsible for integrating gradients and updating parameters.

So let’s focus on GPU 0.

DataParallel – Based networking models are placed on GPU 0 by default, then copied from GPU 0 to other Gpus. Each GPU is trained in parallel, and GPU 0 is used as the master to compile gradients, update models, and distribute computation tasks to other Gpus. This is very similar to the mechanism of parameter servers.

The same information can be seen from the official chart.

1.3 From the perspective of operating system

From an operating system perspective, DP and DDP are different as follows:

  • DataParallel is a single-process, multi-threaded, parallel training scheme that can only be run on a single machine.
  • DistributedDataParallel is multi – process and is suitable for single – machine and multi – machine training. DistributedDataParallel also replicates the model up front, rather than at each iteration, and avoids global interpreter locking.

1.4 low efficiency

DP has the following defects:

  • Redundant data copies

    • Data is first copied from the host batch to the primary GPU, and then distributed between other Gpus in Scatter.
  • Model replication across gpus is required before forward propagation.

    • Since the model parameters are updated on the main GPU, the model must be resynchronized at the start of each forward propagation.
  • Each batch has thread creation/destruction overhead.

    • Parallel forward propagation is implemented in multiple threads (this may just be an issue for PyTorch).
  • There was an opportunity to streamline the gradient specification but it was not used.

    • In the Pytorch 1.0.1 parallel implementation of data, gradient descent occurs at the end of back propagation, which can be pipelined.
  • Collecting model outputs unnecessarily on the main GPU.

  • The GPU usage is uneven and the load is unbalanced. The memory and usage of the main GPU will be higher than that of other graphics cards because:

    • Loss calculation is performed on the main GPU.
    • Both gradient protocol and update parameters occur on the main GPU.

0 x02 review

2.1 the sample

Let’s use an example. The logic is as follows:

  • Set visible GPU for this program.

    • Gpu_id =”2,7″ and os.environ[‘CUDA_VISIBLE_DEVICES’] = args.gpu_id Os. environ[‘CUDA_VISIBLE_DEVICES’] = “2,7” so that device_ids[0] corresponds to card # 2 and device_ids[1] corresponds to card # 7.
    • CUDA_VISIBLE_DEVICES=’2,7′ Python train.py.
  • Put model parameters and buffers on device_ids[0]. Before running the DataParallel module, the parallelization module must have parameters and buffers on Device_ids [0].

    • The code is model=model.cuda().
  • Build the DP model. The advantage of DP is that it is very convenient to use. You only need to change the original single-card module into multi-card module with DP.

    • Model =torch. Nn.DaraParallel(model)
    • DP is actually a Pytorch’s nn.Module, so both the model and optimizer need to use.Module to get the actual model and optimizer.
  • Load data into the main GPU.

    • data,label= data.cuda(),label.cuda()
  • Carry forward propagation.

    • DP will make a copy of the model module on each device.
    • DP will divide the input data into multiple small pieces and distribute these small pieces of data to different Gpus for calculation. Each model only needs to process the data assigned to it.
  • It propagates backward.

    • DP will add the gradient calculated by each GPU to GPU 0 for summary.

The specific code is as follows:

The args. Gpu_id = "2, 7"; Cuda = not args.no_cuda and torch.cuda.is_available() CUDA_VISIBLE_DEVICES='2,7' Python train.py os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu_id # the assignment must be a string Device_ids =range(torch.cuda.device_count()) #torch.cuda.device_count()=2 # device_ids=[0,1] ---- can also be used. If arg. Cuda: if arg. Cuda: if arg. 1 if len(device_id)>1: model=torch. Nn.DataParallel - gradient (model); # build DP, Optimizer = torch.optim.sgd (model.parameters(), args. Lr, momentum=args. (data, label) for batch_idx, (data, label) in pbar: if args. Cuda: data,label= data.cuda(),label.cuda(); Data = Variable(data) target_var = Variable(label) prediction= model(data_v,target_var,args) # The prediction results here are combined by two Gpus. The calculation amount of each GPU is batch_size/len(device_ids). After the forward propagation, the result will be merged into the main GPU. The length of #prediction equals batch_size criterion = Nn.CrossEntropyLoss() loss = Criterion (prediction,target_var) # Calculate loss Optimizer.zero_grad () loss.Backward () on default GPU optimizer.step()Copy the code

2.2 Related knowledge

Before each network propagation, DP will broadcast parameters and buffer on the master node to other nodes to maintain the unity of the state. This part of the related knowledge is mainly about how to copy the model to the GPU and how to call the GPU core function. For details, see PyTorch how to use the GPU.

0 x03 definition

3.1 define

We look at the structure of DataParallel by using the initialization function of DataParallel.

The three input arguments __init__ are defined as follows:

  • Module: Model,
  • Device_ IDS: trained device,
  • Output_device: device for saving output results. The default is device_ids[0], the first card.

The code is as follows:

import operator import torch import warnings from itertools import chain from .. modules import Module from .scatter_gather import scatter_kwargs, gather from .replicate import replicate from .parallel_apply import parallel_apply from torch._utils import ( _get_all_device_indices, _get_available_device_type, _get_device_index, _get_devices_properties ) class DataParallel(Module): # TODO: update notes/cuda.rst when this class handles 8+ GPUs well def __init__(self, module, device_ids=None, output_device=None, dim=0): Super (DataParallel, self).__init__() # to make available GPU device_type = _get_available_device_type() if device_type is None: Self. module = module self.device_ids = [] return # use all visible GPU if device_ids is None: Device_ids = _get_all_device_indices() # if output_device is None: output_device = device_ids[0] self.dim = dim self.module = module self.device_ids = [_get_device_index(x, True) for x in device_ids] self.output_device = _get_device_index(output_device, True) self.src_device_obj = torch.device(device_type, Self.device_ids [0]) # check load balancing _check_balance(self.device_ids) self.module.to(self.src_device_obj)Copy the code

3.2 Load Balancing

Although input data is evenly divided and distributed in parallel, output loss is aggregated and calculated on the first GPU every time. Therefore, the memory load and usage of the first GPU are larger than those of other graphics cards.

The _check_balance function checks whether the load is balanced, with a warning if the memory or processor Max /min is greater than 0.75.

def _check_balance(device_ids): imbalance_warn = """ There is an imbalance between your GPUs. You may want to exclude GPU {} which has less than 75% of the memory or cores of GPU {}. You can do so by setting the device_ids argument to DataParallel, or by setting the CUDA_VISIBLE_DEVICES environment variable.""" device_ids = [_get_device_index(x, True) for x in device_ids] dev_props = _get_devices_properties(device_ids) def warn_imbalance(get_prop): values = [get_prop(props) for props in dev_props] min_pos, min_val = min(enumerate(values), key=operator.itemgetter(1)) max_pos, max_val = max(enumerate(values), Key =operator.itemgetter(1)) if min_val/max_val < 0.75: warnings.warn(imbalance_warn.format(device_ids[min_pos], device_ids[max_pos])) return True return False if warn_imbalance(lambda props: props.total_memory): return if warn_imbalance(lambda props: props.multi_processor_count): returnCopy the code

0x04 Forward Propagation

DataParallel parallel computation only exists in forward propagation.

4.1 an overview

In the previous example, cudA () was used to put the model on GPU[0], where parameters and buffers for the model are already available.

model=model.cuda() 
Copy the code

Therefore, the forward function is not used as this step. Instead, it starts by distributing the model and data. Note that the model is distributed every time the forward propagates. Specifically divided into several steps.

  • Verify: Iterate over module parameters and buffers to see if they are all above GPU[0]. If not, an error is reported.
  • Scatter input data: The input data is divided into multiple copies according to the first dimension (usually batch size) and sent to multiple Gpus.
  • Replicate a model: Copies a model to multiple Gpus.
  • Parallel_apply: Forward propagation across multiple models in parallel. Since GPU Device_ids [0] and Base Parallelized Module share storage, in-place updates on device[0] are also preserved, but not on other Gpus.
  • Gather: Collect data sent back from multiple Gpus;

The specific code is as follows:

def forward(self, *inputs, **kwargs): with torch.autograd.profiler.record_function("DataParallel.forward"): # if not self. Device_ids: Module (*inputs, **kwargs) return self.module(*inputs, **kwargs) # check whether module parameters and buffers are above GPU[0] for t in chain(self.module.parameters(), self.module.buffers()): if t.device ! = self.src_device_obj: raise RuntimeError("module must have its parameters and buffers " "on device {} (device_ids[0]) but found one of " "them  on device: {}".format(self.src_device_obj, t. doice)) # Now there is a model on GPU[0], Check (inputs, kwargs, self.device_ids) for forward function without any inputs, empty list and dict will be created # so the module can be executed on one device which is the first one in device_ids Inputs: kwargs: inputs = ((),) kwargs = ({},) # Return self. Module (* Inputs [0], **kwargs[0]) replicas = self. Module, Outputs = elsif (replicas, inputs) elsif (replicas, inputs) Master return self. Gather (outputs, self. Output_device)Copy the code

4.2 Distribution (Input)

In the code above, the following statement completes the data distribution operation.

inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
Copy the code

Corresponding to our propagation diagram is:

So let’s see how it’s distributed.

Scatter_kwargs scatter_Kwargs scatter_Kwargs scatter_Kwargs scatter_Kwargs

    def scatter(self, inputs, kwargs, device_ids):

        return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
Copy the code

2 scatter_kwargs

Scatter_kwargs calls Scatter to distribute input and Kwargs respectively.

def scatter_kwargs(inputs, kwargs, target_gpus, dim=0): R """ # input = Scatter (inputs, target_gpus, Kwargs = scatter(kwargs, target_gpus, dim) in the inputs [] # Inputs: Inputs < len(kwargs) inputs.extend([() for _ in range(len(kwargs) - len(inputs))]) elif len(kwargs) < len(inputs): Kwargs. extend([{} for _ in range(inputs) -len (kwargs))]) # return tuple inputs = tuple(inputs) kwargs = tuple(kwargs) return inputs, kwargsCopy the code

4.2.2 scatter

As you can see from the annotations, the tensor cuts into roughly equal chunks and distributes them between a given GPU. It is to divide a batch of data into smaller batches approximately equally. For other types of variables, different operations are performed depending on the type, such as calling scatter_map to recursively process the interior.

def scatter(inputs, target_gpus, dim=0): r""" Slices tensors into approximately equal chunks and distributes them across given GPUs. Duplicates references to objects that are not tensors. """ def scatter_map(obj): if isinstance(obj, torch.Tensor): Apply (target_gpus, None, dim, obj) if is_namedtuple(obj): Scatter_map is called to recursively process its child modules. return [type(obj)(*args) for args in zip(*map(scatter_map, obj))] if isinstance(obj, tuple) and len(obj) > 0: Scatter_map is called to recursively process its child modules. Return list(zip(*map(scatter_map, obj)) if isinstance(obj, list) and len(obj) > 0: # Call scatter_map recursively for its submodules. return [list(i) for i in zip(*map(scatter_map, obj))] if isinstance(obj, dict) and len(obj) > 0: Scatter_map is called to recursively process its child modules. return [type(obj)(i) for i in zip(*map(scatter_map, obj.items()))] return [obj for targets in target_gpus] # After scatter_map is called, a scatter_map cell will exist. This cell # has a reference to the actual function scatter_map, which has references # to a closure that has a reference to the scatter_map cell (because the # fn is recursive). To avoid this reference cycle, we set the function to # None, clearing the cell try: res = scatter_map(inputs) finally: scatter_map = None return resCopy the code

Holdings Scatter

Scatter. Apply handles the tensor, so let’s take a look. Scatter extends Function with the following logic:

  • If CUDA is available, then you get the Streams list so that cpu-to-GPU copying can be done in the background stream.
  • Call Comm. scatter for distribution.
  • Call wait_stream and record_stream to synchronize the copy stream.
class Scatter(Function): @staticmethod def forward(ctx, target_gpus, chunk_sizes, dim, input): target_gpus = [_get_device_index(x, True) for x in target_gpus] ctx.dim = dim ctx.input_device = input.get_device() if input.device.type ! If torch.cuda.is_available() and ctx.input_device == -1: Perform CPU to GPU copies in a background stream streams = [_get_stream(device) for device in target_gpus] # Perform CPU to GPU copies in a background stream streams = [_get_stream(device) for device in target_gpus  outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams) # Synchronize with the copy stream if streams is not None: for i, output in enumerate(outputs): with torch.cuda.device(target_gpus[i]): Main_stream = torch.cuda.current_stream() main_stream.wait_stream(streams[I]) # Outputs @staticmethod def backward(CTX, *grad_output): return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)Copy the code

4.2.4 comm. Scatter

This function basically calls torch. _c._scatter, which takes us into the C++ world.

​
def scatter(tensor, devices=None, chunk_sizes=None, dim=0, streams=None, *, out=None):
    """Scatters tensor across multiple GPUs. """
    tensor = _handle_complex(tensor)
    if out is None:
        devices = [_get_device_index(d) for d in devices]
        return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams))
    else:
        return tuple(torch._C._scatter_out(tensor, out, dim, streams))
Copy the code

4.2.5 c + +

Among the conversion files, you can see that Scatter is the target we wanted to analyze.

.def( "_scatter", [](at::Tensor& tensor, std::vector<int64_t>& devices, c10::optional<std::vector<int64_t>> chunk_sizes, int64_t dim, c10::optional<py::object> py_streams) { c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>> streams; if (py_streams) { py::handle handle = *py_streams; streams = THPUtils_PySequence_to_CUDAStreamList(handle.ptr()); } // Note: We're holding the GIL up to here. pybind11::gil_scoped_release no_gil; Return scatter(tensor, devices, chunk_sizes, dim, streams); }, py::arg("tensor"), py::arg("devices"), py::arg("chunk_sizes"), py::arg("dim"), py::arg("streams"))Copy the code

Scatter distributes data to each GPU. The logic is as follows:

  • The tensor is divided into chunks by calling split_with_sizes or chunk.
  • Secondly, chunks are distributed to each GPU through to distribution.
std::vector<at::Tensor> scatter( const at::Tensor& tensor, at::IntArrayRef devices, const c10::optional<std::vector<int64_t>>& chunk_sizes, int64_t dim, const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>& streams) { dim = at::maybe_wrap_dim(dim, tensor);  // put the tensor into chunks STD ::vector<at:: tensor > chunks = chunk_sizes? tensor.split_with_sizes(/*split_sizes=*/*chunk_sizes, /*dim=*/dim) : tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim); at::cuda::OptionalCUDAStreamGuard cuda_guard; For (size_t I = 0; i < chunks.size(); ++i) { const auto device_index = static_cast<int16_t>(devices[i]); if (device_index ! = tensor.get_device()) { if (i < (streams ? streams->size() : 0U) && (*streams)[i]) { cuda_guard.reset_stream(*(*streams)[i]); } chunks[I] = chunks[I]. To (// copy {DeviceType::CUDA, device_index}, /*non_blocking=*/true, /*copy=*/false, /*memory_format=*/at::MemoryFormat::Preserve); } } return chunks; // return result}Copy the code

4.3 Replication (Model)

Currently, we have used Scatter function to distribute and copy data from Device [0] to different cards. Now we will use Replicate function to copy the model from Device [0] to different cards.

Replicas = self.replicate(self.module, self.device_ids[:len(inputs)])Copy the code

Corresponding to our propagation diagram is:

Replicate is just a forward. We need to look at that.

def replicate(self, module, device_ids):
    return replicate(module, device_ids, not torch.is_grad_enabled())
Copy the code

This will replicate

Replicate concrete logic is:

  • Use _replicatable_module to see if you can safely replicate the model.

  • Let’s see how many Gpus we have and how many copies we have to make.

  • Copy operation.

    • Copy the parameters.

      • 0 use _broadcast_COALesced_0 to copy parameters to each GPU
    • Copy the buffers.

      • First take a count of buffers.
      • Record the index of the buffer to be differentiated.
      • Record the index of buffers that do not need to be differentiated.
      • For the two buffers, _broadcast_COALesced_0 is 0 0 copied to each GPU.
    • Copy the model.

      • Modules () returns an iterator containing all modules of the current model. If you change it to list, you can say that you’ve flattened the model.
      • Iterate through modules, adding each layer of the model to each module_copies.
      • Finally, the module_copies[j] contain each layer of the model, i.emodule_copies[j][i]That’s layer I of the model.
  • Configuration operations.

    • To configure the model network, configure the reference of GPU data into each module of the Modules array, so that these modules are complete models.

    • This is because the nested model network was broken up and copied to the GPU: Buffers and parameters are also copied to the GPU. Now you need to reconfigure them into the shallow-copy model, which completes the model logic.

    • Iterate through each sub-module of the model and configure only the required parameters.

      • Deal with his son_modules_.
      • Process its _parameters.
      • Process its _buffers.
  • In subsequent parallel operations, each worker will get each module of the Modules array and train on this module.

The specific code is as follows:

def replicate(network, devices, detach=False): if not _replicatable_module(network): raise RuntimeError("Cannot replicate network where python modules are " "childrens of ScriptModule") if not devices: Return [] # see how many gpus there are, Number of devices to be copied = [_get_device_index(x, True) for x in devices] num_replicas = len(devices list(network.parameters()) param_indices = {param: idx for idx, Param in enumerate(params)} _broadcast_COALesced_0 param_copies = 0 _broadcast_coalesced_reshape(params, devices, Buffers = list(network.buffers()) buffers_rg = [] # For buf in buffers: if buf. Requires_grad and not detach: buffers_rg.append(buf) else: Buffers_not_rg. Append (buf) # Record the index of the buffer to be derived buffer_indices_rg = {buf: Idx for IDX, buf in enumerate(buffers_rg)} # Record the index of the buffer that does not need to be computed. idx for idx, Buf 0 buf in enumerate(buffers_not_rg)} # For two kinds of buffers, copy them to gpus devices, detach=detach) buffer_copies_not_rg = _broadcast_coalesced_reshape(buffers_not_rg, devices, Detach =True) # prepare to copy model network modules = list(network.modules()) # modules() returns an iterator containing all modules of the current model. To list, Module_copies = [[] for device in devices] # null list module_copies = {} List module_indices[module] = I for j in range(num_replicas): Replica = module._replicate_for_data_parallel() # This is a temporary fix for DDP. DDP needs to access the # replicated model parameters. It used to do so through # `mode.parameters()`. The fix added in #33907 for DP stops the # `parameters()` API from exposing the replicated parameters. # Hence, we add a `_former_parameters` dict here to support DDP. replica._former_parameters = OrderedDict() [j]. Append (Replica) # Add each layer of the model to each module_copies. This step is to assign the reference value of the data in the GPU to the shallow copy, which will become a complete model. Before, the nested model network was broken up and copied to GPU, and buffers and parameters were also copied to GPU respectively. Now, they are built into the shallow copy model to complete the model logic. For I, module in enumerate(modules): if child is None: for j in range(num_replicas): Replica = replica [j] # replica = replica [j] # replica = replica [j] # replica = replica [j] # replica = replica [j] # replica = replica [key] = None else: module_idx = module_indices[child] for j in range(num_replicas): Replica = replica [j][I] # replica = replica [J] # replica = replica [J] _parameters for key, param in module._parameters.items(): if param is None: for j in range(num_replicas): replica = module_copies[j][i] replica._parameters[key] = None else: param_idx = param_indices[param] for j in range(num_replicas): replica = module_copies[j][i] param = param_copies[j][param_idx] # parameters in replicas are no longer leaves, # so setattr them as non-parameter attributes setattr(replica, key, Param) # expose the parameter for DDP replica._former_parameters[key] = param # handle _buffers for key, buf in module._buffers.items(): if buf is None: for j in range(num_replicas): replica = module_copies[j][i] replica._buffers[key] = None else: if buf.requires_grad and not detach: buffer_copies = buffer_copies_rg buffer_idx = buffer_indices_rg[buf] else: buffer_copies = buffer_copies_not_rg buffer_idx = buffer_indices_not_rg[buf] for j in range(num_replicas): replica = module_copies[j][i] setattr(replica, key, buffer_copies[j][buffer_idx]) return [module_copies[j][0] for j in range(num_replicas)]Copy the code

4.3.2 Checking Copy information

_replicatable_module checks whether the model can be safely copied.

​
# Check if we can safely replicate the module.
# there are two types of module:
# 1. python modules
# 2. ScriptModule
#
# currently a module cannot be replicated properly if the descendants of
# any ScriptModule contains python module (type 1 above)
def _replicatable_module(module, memo=None):
​
    # module.modules() contains module itself as the first element
    def descendant_modules(module):
        gen = module.modules()
        next(gen)
        return gen
​
    if not _is_jit_enabled():
        return True
    if memo is None:
        memo = set()
​
    # memoize visited modules
    memo.add(module)
    if _is_script_module(module):
        memo.update(descendant_modules(module))
        return all(_is_script_module(descendant) for
                   descendant in descendant_modules(module))
​
    for child in module.children():
        # since any unreplicatable module will cause the check to return
        # False early, visited modules here can be safely ignored.
        if child in memo:
            continue
        if not _replicatable_module(child, memo):
            return False
​
    return True
Copy the code

4.3.3 Sharing a Copy

In PyTorch, there are shallow copies and deep copies.

Given a series of parameter matrices inside the model, the model object actually points to each parameter matrix.

  • Shadow copy copies only the outermost values and Pointers. It does not copy the deeper objects, so it copies only the parent object. Model.state_dict () is also a shallow copy. If you set param=model.state_dict(), then when you modify param, you modify model parameters accordingly.
  • On the other hand, deepcopy: copies values, Pointers, and the deep memory space to which Pointers point, i.e. copies the parent object and its children.

Such as:

Import torch import copy # a = torch. Nn.Linear(IN_features =5, out_features=1, bias=True) B = copy.copy(a) # state_dict is shadow copy p = a.state_dict() print(id(a.state_dict()) == id(p)) # Print (a.eight) p['weight'][0][0] = 8.8888 # print(a.eight)Copy the code

The output is as follows:

False
Parameter containing:
tensor([[-0.2253,  0.0802,  0.3984, -0.1208,  0.3796]], requires_grad=True)
Parameter containing:
tensor([[ 8.8888,  0.0802,  0.3984, -0.1208,  0.3796]], requires_grad=True)
Copy the code

Back to our analysis, in the Module class, there is the _replicate_for_data_parallel method, which returns a replica that shares storage with the original model, known as a shallow copy.

def _replicate_for_data_parallel(self): replica = self.__new__(type(self)) replica.__dict__ = self.__dict__.copy() # replicas do not have parameters themselves,  the replicas reference the original # module. replica._parameters = OrderedDict() replica._buffers = Replica._buffers. Copy () # Shallow copy replica._modules = replica._modules. Copy () # Shallow copy of the internal sub-module replica._is_replica = True return  replicaCopy the code

It can be assumed that before the setup operation, the following is copied:

+---------------------------------------------------------------+ | +----------------------+ | | CPU | Module | | | | | | | | _parameters | | | | | | | +--------------> _buffers <-------------+ | | | | | | | | | +-------> _modules <----------+ | | | | | | | | | | | | | +----------------------+ | | | | +---------------------+ | +----------------------+ | | | | | module_copies[0] | | | | module_copies[1] | | | | | | | | | | | | | | | | _parameters  | | | | _parameters | | | | | | | | | | | | | | | | _buffers +----+ | | | _buffers +--------------+ | | | | | | | | | |  | _modules +-------->+ | _modules +--------->+ | | | | | | | | +---------------------+ +----------------------+ | +---------------------------------------------------------------+ +---------------------+ +----------------------+ | GPU  0 | | GPU 1 | | | | | | _parameters | | _parameters | | | | | | _buffers | | _buffers | | | | | | | | | | | | | +---------------------+ +----------------------+Copy the code

After the setup operation, it looks like this:

+-----------------------------------------------------------------+ | CPU +----------------------+ | | | Module | | | | | | | | _parameters | | | | | | | | _buffers | | | | | | | | _modules | | | | | | | +----------------------+ | | +---------------------+ +----------------------+ | | | module_copies[0] | | module_copies[1] | | | | | | | | +---------+  _parameters | | _parameters +-----------+ | | | | | | | | | | | | _buffers +------------+ | _buffers +-----------+ | | | | | | | | | | | | | | | _modules | | | _modules | | | | | | | | | | | | | | | | +---------------------+ | +----------------------+ | | | | +-----------------------------------------------------------------+ | | | | | +---------------------+ | +----------------------+ | | | | GPU 0 | | | GPU 1 | | | | | | | | | | | +---------> _parameters | | | _parameters <----------+ | | | | | | | _buffers <----------+ | _buffers <--------+ | | | | | | | | | |  | | +---------------------+ +----------------------+Copy the code

4.3.4 Copy Operations

4.3.4.1 _broadcast_coalesced_reshape

_broadcast_COALesced_0 is 0

def _broadcast_coalesced_reshape(tensors, devices, detach=False): from ._functions import Broadcast if detach: Broadcast_coalesced (tensors, devices) else: # Use the autograd function to broadcast if not detach if len(tensors) > 0: If not, use Broadcast first. Broadcast_coalesced tensor_copies = broadcast. apply(Devices, *tensors) return [tensor_copies[i:i + len(tensors)] for i in range(0, len(tensor_copies), len(tensors))] else: return []Copy the code
4.3.4.2 Broadcast

The reason for overusing Broadcast is that since the tensor is not detached, in addition to broadcasting, you need to set up what gradients are not required in the context. In some cases, user-defined functions may need to be aware of this.

class Broadcast(Function): @staticmethod def forward(ctx, target_gpus, *inputs): assert all(i.device.type ! = 'cpu' for i in inputs), ( 'Broadcast function not implemented for CPU tensors' ) target_gpus = [_get_device_index(x, True) for x in target_gpus] ctx.target_gpus = target_gpus if len(inputs) == 0: Return tuple() ctx.num_inputs = len(inputs) # input for device[0] CTx. input_device = inputs[0].get_device() # Outputs = comm.broadcast_coalesced(Inputs, ctx. target_gPUS) non_differentiables = [] input_requires_grad in enumerate(ctx.needs_input_grad[1:]): if not input_requires_grad: for output in outputs: non_differentiables.append(output[idx]) ctx.mark_non_differentiable(*non_differentiables) return tuple([t for tensors in  outputs for t in tensors]) @staticmethod def backward(ctx, *grad_outputs): return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)Copy the code

The mark_non_differentiable defined in the torch / / autograd custom_function CSRC CPP, here will be AutogradContext configuration of differential variables.

void AutogradContext::mark_non_differentiable(const variable_list &outputs) { non_differentiable_.clear(); non_differentiable_.reserve(outputs.size()); for(auto& var : outputs) { non_differentiable_.insert(var.unsafeGetTensorImpl()); }}Copy the code
4.3.4.3 broadcast_coalesced

Broadcast_coalesced jumps to the C++ world.

def broadcast_coalesced(tensors, devices, buffer_size=10485760):
    """Broadcasts a sequence tensors to the specified GPUs.
    Small tensors are first coalesced into a buffer to reduce the number
    of synchronizations.
​
    Args:
        tensors (sequence): tensors to broadcast. Must be on the same device,
          either CPU or GPU.
        devices (Iterable[torch.device, str or int]): an iterable of GPU
          devices, among which to broadcast.
        buffer_size (int): maximum size of the buffer used for coalescing
​
    Returns:
        A tuple containing copies of :attr:`tensor`, placed on :attr:`devices`.
    """
    devices = [_get_device_index(d) for d in devices]
    tensors = [_handle_complex(t) for t in tensors]
    return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
​
Copy the code
4.3.4.4 c + +

As can be seen from the initialization code, it is implemented in Broadcast_COALesced.

  auto m = py::cast<py::module>(module);
  m.def(
       "_broadcast_coalesced",
       [](std::vector<at::Tensor>& tensors,
          std::vector<int64_t> devices,
          size_t buffer_size) {
         return broadcast_coalesced(tensors, devices, buffer_size);
       },
       py::arg("tensors"),
       py::arg("devices"),
       py::arg("buffer_size"),
       py::call_guard<py::gil_scoped_release>())
Copy the code
  • Broadcast_coalesced distributes variables to all Gpus. In Broadcast_coalesced, multiple variables can be combined into one large variable, which is then broadcast to other devices and then split based on the original shape.
  • When split, the view action causes all variables to broadcast together to share a version counter, because they are views of large variables. However, the large variable is immediately discarded, and all of these variables do not share storage at all.
  • For example, when two buffers are broadcast together in DataParallel and one performs an in-place operation during a forward while the other is used in a BACKWARD, the Autograd engine will complain. Therefore, we repackage these variables after broadcast and provide them with separate version counters.

The code is located in Torch/CSRC/CUDA /comm.cpp. Let’s look at the comments.

// broadcast_coalesced
// ~~~~~~~~~~~~~~~~~~~
//
// In broadcast_coalesced, multiple variables may be coalesced into a single
// large one, broadcast to other devices, and the get split according to the
// original shapes.
//
// When splitting, the view operations will make all Variables broadcast
// together to share a single version counter, because they are all views of the
// large Variable. However, that large Variable is immediately discarded and all
// these Variables do not share storage at all.
//
// For example, when two buffers are broadcast together in `DataParallel` and
// one of them is modified in-place during `forward` but the other is needed in
// backward, autograd engine will complain.
//
// We thus re-wrap these Variables after broadcasting (i.e., effectively doing
// what is equivalent to .data in Python), and give them individual version
// counters.
Copy the code

The parameters of broadcast_COALesced are explained as follows:

  • Tensors must be on the same device, CPU or GPU;
  • Devices are the devices to be copied to;
  • Buffer_size is the largest buffer. Here buffer is used to merge small tensors into the buffer to reduce synchronization;
tensor_list2d broadcast_coalesced( TensorList tensors, IntArrayRef devices, size_t buffer_size) { TORCH_CHECK( std::all_of( tensors.begin(), tensors.end(), [&](const at::Tensor& t) { return t.get_device() == devices[0]; }), "All tensors must be on devices[0]: ", devices[0]); #ifdef USE_NCCL buffer_size = std::min(torch::cuda::nccl::get_max_count(), buffer_size); #endif tensor_list2d outputs(devices.size()); outputs[0] = tensors.vec(); for (auto& o : outputs) o.reserve(tensors.size()); unique_type_checker type_checker; at::cuda::CUDAGuard device_guard(devices[0]); for (auto& chunk : utils::take_tensors(tensors, buffer_size)) { auto type_id = chunk.type_id(); type_checker.show(type_id); std::vector<at::Tensor> results; if (chunk.options().is_sparse()) { auto flat_tuple = utils::flatten_sparse_tensors(chunk.tensors); auto broadcast_indices = broadcast(flat_tuple.first, devices); Auto broadcast_values = broadcast(flat_tuple. Second, devices); Results.reserve (devices.size()); for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) { device_guard.set_index(devices[i]); auto& device_outputs = outputs[i]; auto& inds = broadcast_indices[i]; auto& vals = broadcast_values[i]; for (auto& t : utils::unflatten_sparse_tensors(inds, vals, chunk.tensors)) { Variable var = t; device_outputs.push_back(make_variable(var.tensor_data(), false)); }} else {auto results = // broadcast(utils::flatten_dense_tensors(chunk.tensors), devices); for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) { device_guard.set_index(devices[i]); auto& device_outputs = outputs[i]; for (auto& t : utils::unflatten_dense_tensors(results[i], chunk.tensors)) { Variable var = t; device_outputs.push_back(make_variable(var.tensor_data(), false)); } } } } // If we only saw a single tensor type, then we can skip expensive reordering if (! type_checker.unique) { for (auto& o : outputs) utils::reorder_tensors_like(o, tensors); } return outputs; }Copy the code

The broadcast method is as follows:

std::vector<Tensor> broadcast(const Tensor& tensor, IntArrayRef devices) { std::vector<Tensor> diff_device_dst_tensors; diff_device_dst_tensors.reserve(devices.size()); for (auto device : devices) { if (device ! = tensor.get_device()) { diff_device_dst_tensors.push_back(at::empty( tensor.sizes(), tensor.options().device( at::Device(DeviceType::CUDA, device)))); // preserve memory format}} _broadcast_out_impl(tensor, Diff_device_dst_tensors); std::vector<Tensor> dst_tensors; dst_tensors.reserve(devices.size()); auto it = diff_device_dst_tensors.begin(); for (auto device : devices) { if (device ! = tensor.get_device()) { dst_tensors.push_back(*it++); } else { dst_tensors.push_back(tensor); } } TORCH_INTERNAL_ASSERT(it == diff_device_dst_tensors.end()); return dst_tensors; }Copy the code

The final call to _broadcast_out_impl broadcasts the source tensor (CPU or CUDA) to a LIST of CUDA devices, which calls NCCL ::broadcast(NCCL_list).

static inline std::vector<Tensor>& _broadcast_out_impl( const Tensor& tensor, std::vector<Tensor>& out_tensors) { #ifdef USE_NCCL std::vector<Tensor> nccl_list; nccl_list.reserve(out_tensors.size() + 1); nccl_list.push_back(tensor); for (auto& out_tensor : out_tensors) { nccl_list.push_back(out_tensor); } if (nccl::is_available(nccl_list)) { nccl::broadcast(nccl_list); } else {#else {#endif for (auto& out_tensor: out_tensors) { out_tensor.copy_(tensor, /*non_blocking=*/true); } } return out_tensors; }Copy the code

At this point, we have distributed the data and model to other Gpus. Replicate calls broadcast. forward, and stores input_device and num_inputs to its context. Then forward propagation can be carried out.

+----------------------------------------------------------------------------------------+ | DataParallel.forward | | | | | | replicate +---------------> parallel_apply gather | | | +----------------------------------------------------------------------------------------+ +---------------------------+  | Broadcast | | | | | | | | forward() +-----------> | | | | | +---------------------+ | | | ctx | | | | input_device | | | | | | | | num_inputs | | | | | | | +---------------------+ | | | | | | | | | | | | | +---------------------------+Copy the code

Due to space constraints, we will continue our analysis from parallel operation (forward propagation) in the next article.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

PyTorch source code interpretation torch. Optim: optimization algorithm interface details

Pytorch (distributed) data parallel personal practice – DataParallel/DistributedDataParallel

The nn Pytorch DataParallel

PyTorch source code interpretation of distributed training to understand?

Discuss.pytorch.org/t/dataparal…

PyTorch DDP series 2: implementation principle and source code analysis

Pytorch-CUDA From Getting Started to Giving Up (Part 2)

Pytorch pits: The differences between assignment, shallow copy, and deep copy, and the pits for model.state_dict() and model.load_state_dict()

DP & DDP for PyTorch source Code Interpretation: Model parallel and Distributed training parsing