0 x00 the

Bagua is a distributed training framework jointly developed by Kuaishou and ETH Zurich. It designs specific optimization algorithms for distributed scenarios, realizes joint optimization of algorithm and system level, and tries to maximize the efficiency of distributed training. Its characteristics are:

  • The parallel performance is significantly improved.

  • More robust to network environment;

  • “One-click” use;

  • Distributed communication algorithm extension;

  • Can be used for large-scale use in industrial scenes;

  • Safety, easy to troubleshoot;

In this paper:

  • Quick hand official public number article quick hand gossip! Here comes an open source distributed training framework that breaks the parallel bottleneck of TensorFlow and PyTorch.
  • “Bagua” paper arxiv.org/pdf/2107.01…
  • Bagua’s official website tutorials.baguasys.com/
  • Bagua demo document
  • GitHub address: github.com/BaguaSys/ba…

To analyze learning on the basis of. This paper studies “Bagua” overall design ideas and load balancing data loader.

0x01 Design Idea

The following excerpts from kuaishou official post kuaishou gossip! Here comes an open source distributed training framework that breaks the parallel bottleneck of TensorFlow and PyTorch. And ETH PPT, according to their own understanding of the adjustment.

1.1 How to Communicate

In data parallelism, the core of the training from single machine and single card to multi-machine and multi-card is that each card accumulates and disseminates its own calculation results. Therefore, a key point is how to communicate between two workers.

This process is like each person passing on his own information to others, and then getting information from others, and finally completing the global information synchronization. If the information synchronization between cells is analogous to the information synchronization between people, then social experience tells us that “gossip” may be the most efficient mode of message transmission. “Bagua” news dissemination has the characteristics of decentralization, asynchronous communication and information compression, which exactly corresponds to the communication algorithm implemented in Bagua.

1.2 Communication mode classification

The communication modes are classified as follows.

1.2.1 System Architecture

According to the system architecture, parameter server and Allreduce.

The figure below is a legend of the parametric server and Allreduce paradigm.

  • inParameter serverIn an architecture, models can be shards and distributed to multiple nodes (we call these “parameter servers”). In the training phase, the worker regularly obtains the model from the parameter server, uses computing units (such as GPU) for forward and backward propagation, and pushes gradients to the parameter server, which collects gradients and updates parameters.
  • inAllreduceIn the paradigm, all workers cooperate with their neighbors for model/gradient exchange. Existing systems usually use a ring topology for two-stage communication: first, the paradigm divides the model/gradient into N blocks (where N is the number of nodes), and aggregates n blocks using n rings with different starting and ending points. Second, the aggregation results of each block at a different node are broadcast within the ring.

1.2.2 Synchronization Angle

It can be Synchronous or Asynchronous in communication synchronization:

  • In synchronous mode, during each iteration, all working nodes need to communicate, and the next iteration must wait for the communication of the current iteration to complete before starting.
  • On the contrary, the asynchronous distribution algorithm does not need waiting time: when a node completes the calculation, the local gradient can be directly transmitted to update the model.

1.2.3 Communication Topology

From the perspective of communication topology, it can be divided into Centralized or Decentralized:

  • In the centralized communication mode, all working nodes are required to participate in the synchronization process of gradient or model. Therefore, high network delay often leads to the reduction of training efficiency.
  • Decentralized communication mode can effectively solve the above problems: in this mode, working nodes can be connected into a specific topology (such as rings), and each working node only communicates with its adjacent nodes during the communication process.

1. The compression

From the perspective of communication compression, there are two modes: full-precision mode or low-precision mode:

  • The full precision mode transmits the same 32-bit floating-point number (FLOAT32) as the native model.
  • On the other hand, in the case of communication bottleneck, based on a large number of existing studies, the gradient is compressed through quantization or sparsification, and then the compressed gradient is used to update parameters. In many scenarios, the same precision as the full precision can be achieved, while improving communication efficiency.

1.3 the challenge

In the implementation of Kuaishou, there are three challenges:

  • Theoretical basis: communication mode should be supported by theory, and it should be proved that communication is effective and convergent strictly in theory.
  • System design: the existing distributed learning system cannot meet all the new communication modes, so a new system structure needs to be designed to take advantage of the advantages brought by this algorithm.
    • Parameter server basic operation PUT/GET, cannot realize decentralization and error compensation.
    • Allreduce is global and cannot be implemented in a decentralized or asynchronous mode.
  • Evaluation: Various algorithms need to be evaluated in large-scale real-world scenarios.

1.4 Bagua implementation

1.4.1 layered

Bagua is divided into three layers:

  • Algorithm layer: on the basis of the logical layer, the implementation of specific algorithms, such as an algorithm is decentralized, compressed, asynchronous.
  • Logical communication layer: on the basis of the physical communication layer, a variety of communication primitives, such as decentralization, precision, synchronization and so on, are realized. These communication primitives are not specifically designed for a class of algorithms, but are unified to the upper layer.
  • Physical communication layer: This layer integrates some common communication libraries to provide basic send and receive operations.

1.4.2 Communication algorithm Options

For communication mode classification, Bagua abstracts the communication process into the following algorithm options:

  • Centralized or Decentralized.

  • Synchronous or Asynchronous.

  • Full Precision mode or information compression mode (full-precision or low-precision).

In order to improve communication efficiency, Bagua did not synchronize the results of all computing nodes in the traditional way, and even there was deviation in the information of each synchronization. However, thanks to the latest theoretical progress, the correctness and efficiency of the final convergence solution of these communication strategies and their combination can still be fully guaranteed. In addition, the computational complexity is similar to that of synchronous centralized and information lossless methods, but the communication efficiency is higher.

Bagua provides a detailed set of communication modes to support users to choose any combination in the above modes. We summarize the support of this distributed training system for the above algorithm options in the following table:

It can be seen from the table that the optimization of the existing framework is only for the more general algorithm (centralized synchronization complete accuracy), and the support of these systems for other algorithm combinations is very limited. For centralized synchronization of information compression, these systems usually only support the relatively simple float32-> FLOAT16 compression, while Bagua can support more complex ByteGrad, QAdam and other algorithms. For other combinations of algorithms, which are generally not supported by existing frameworks, Bagua is free to support.

1.4.3 overall

At the heart of BAGUA is a training algorithm implemented by developers using the communication primitives and abstractions provided by BAGUA. The algorithm takes the neural network provided by the end user as the input and equiped it with an algorithm-specific communication function. Specifically, the developer of the algorithm registers this communication function as a hook at various stages of execution.

1.4.4 optimization

However, simply supporting algorithmic options does not directly lead to performance gains on large clusters. Bagua’s core advantage lies in the joint optimization of algorithms and implementations for the pursuit of extreme performance. Specifically, based on the above communication layer abstraction, users can not only easily choose the various algorithm combinations provided by the system to achieve performance improvement, but also flexibly implement the new distributed SGD algorithm — Bagua will automatically provide system layer optimization for this algorithm implementation. These system optimizations include:

  • Hide the communication time in the calculation time.
  • Parameter buckets and memory management.
  • Hierarchical communication implementation.

It is important to emphasize that these system implementation level optimizations are applicable to a wide variety of algorithm combinations, rather than being limited to a specific algorithm setup. Therefore, all system optimizations can be flexibly reused in various algorithm implementations, which not only ensures the end-to-end performance improvement, but also provides a good platform for the development of new distributed algorithms.

1.5 the flow chart

Let’s use the legend of the official number to conclude

0x02 Analysis Idea

Through official articles, we can find the following situations for analytical learning:

  • Communication optimization is a major feature of bagua project.
  • The underlying Rust language is unfamiliar to the author.
  • It’s not practical to go through the whole code.

Therefore, we decided to focus on the implementation of centralized, asynchronous communication and hierarchical communication, and then combine several characteristic implementations to study and analyze. This article learns about load balancing data loaders.

0x03 Load Balanced Data Loader

In some scenarios, the computational complexity of samples in training data is different, for example, the length of each sample is different in NLP and speech tasks. In this case, the distributed training throughput can be greatly improved by using the load balancing data loader of Gossip. In this case, the workloads of workers are similar. Let’s start with an example to see how to achieve load balancing for data loading.

Let’s take a look at the load balancing requirements. Suppose we have two copies of the model running data in parallel, with the following data, if these data represent data complexity (which affects calculation time) :

[ 7.1.11.5.10.2.9.4.6.0.8.3]
Copy the code

The first copy of the model receives: [7,11,10,9,6, 8]. The second copy of the model receives: [1,5,2,4,0,3]. It can be seen that the complexity of data received by the two models in each batch is different, resulting in load imbalance.

                         +  8                         + 3
                         |                            |
                         |  6                         | 0
                         |                            |
                         |  9                         | 4
                         |                            |
batch 3   +----------->  |  10                        | 2  <----------+  batch 3
                         |                            |
batch 2   +----------->  |  11                        | 5  <----------+  batch 2
                         |                            |
batch 1   +----------->  v  7                         v 1  <----------+  batch 1

                  +-------------------+        +-------------------+
                  |                   |        |                   |
                  |     worker 0      |        |     worker 1      |
                  |                   |        |                   |
                  |                   |        |                   |
                  +-------------------+        +-------------------+
Copy the code

Ideally, each batch of data complexity received by the two models should be similar. For example, the first model receives [1,3,5,7,9], and the second model receives [2,4,6,8,10]. As shown in the following figure, each batch of data complexity is similar to achieve load balancing effect:

                         +                            +
                         |  9                         | 10
                         |                            |
                         |  7                         | 8
                         |                            |
batch 3   +----------->  |  5                         | 6  <----------+  batch 3
                         |                            |
batch 2   +----------->  |  3                         | 4  <----------+  batch 2
                         |                            |
batch 1   +----------->  v  1                         v 2  <----------+  batch 1

                  +-------------------+        +-------------------+
                  |                   |        |                   |
                  |     worker 0      |        |     worker 1      |
                  |                   |        |                   |
                  |                   |        |                   |
                  +-------------------+        +-------------------+
Copy the code

3.1 the use of

We directly use the source code in the example to modify the study.

import torch
from load_balancing_data_loader import LoadBalancingDistributedSampler
from torch.utils.data import TensorDataset, DataLoader

def test_load_balancing_distributed_batch_sampler() :
    num_replicas = 2 # Split into two copies
    total_batch = 3 

    n = sum([i + 1 for i in range(total_batch)]) * num_replicas
    dataset = TensorDataset(torch.randn(n, 2), torch.randperm(n))

    sampler = LoadBalancingDistributedSampler(
        dataset,
        complexity_fn=lambda x: x[1],
        num_replicas=num_replicas,
        rank=0,
        shuffle=True.# need to shuffle
        random_level=0.5.# add random
    )

    dataloader = torch.utils.data.DataLoader(dataset, sampler=sampler)

    cur_idx = 0
    for i, data in enumerate(dataloader):
        batch_size = data[0].shape[0]
        cur_idx += batch_size * num_replicas
        print(cur_idx)

test_load_balancing_distributed_batch_sampler()
Copy the code

Because the code here is very convoluted, we parse it one by one.

3.2 Generating data sets

The first is the data set generation part. Torch. Randn (n, 2) generates a random tensor, and Torch. Randperm (n) generates a random ordering of n. Let’s say that n is 12.

The data set is generated
n = sum([i + 1 for i in range(total_batch)]) * num_replicas
dataset = TensorDataset(torch.randn(n, 2), torch.randperm(n))
Copy the code

TensorDataset is similar to the zip command, which generates a list of tuples.

dataset = {TensorDataset: 12} 
 tensors = {tuple: 2} (
   
  0 = {Tensor: 12} tensor([[-1.5556.0.6848],\n        [ 2.0811.1.5011],\n        [ 0.7434, -0.4990],\n        [-0.2706.1.7227],\n        [ 0.2179.0.0622],\n        [-0.3014, -0.6435],\n        [-0.1773, -1.3405],\n        [-1.8212.0.3702],\n        [-0.5526, -0.2077],\n        [-1.6543.0.3109],\n        [ 0.3265.0.5987],\n        [-1.5566.0.2854]])
   
   1 = {Tensor: 12} tensor([ 7.8.11.4.5.2.9.10.0.6.1.3])
Copy the code

The current TensorDataset is obtained as follows: 0 is the actual data, 1 is the data complexity, and the purpose of subsequent processing is to sort these tensors according to the data complexity. We can assume that the final sort should be a uniform sort.

+-----------------------------------------------------------------------------+
| TensorDataset                                                               |
|                                                                             |
|   0 = {Tensor: 12} tensor([[-1.5556.0.6848],... | | | |1 = {Tensor: 12} tensor([ 7.8.11.4.5.2.9.10.0.6.1.3) | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +Copy the code

3.3 the initialization

We came to LoadBalancingDistributedSampler initialization.

def __init__(
    self,
    dataset: Dataset,
    complexity_fn: Callable[...int],
    num_replicas: Optional[int] = None,
    rank: Optional[int] = None,
    shuffle: bool = True,
    seed: int = 0,
    drop_last: bool = False,
    random_level: float = 0.) - >None:
    if num_replicas is None:
        num_replicas = dist.get_world_size()
    if rank is None:
        rank = dist.get_rank()

    self.dataset = dataset
    self.num_replicas = num_replicas
    self.rank = rank
    self.epoch = 0
    self.drop_last = drop_last

    # If the dataset length is evenly divisible by # of replicas, then there
    # is no need to drop any data, since the dataset will be split equally.
    dataset_len = len(self.dataset)  # type: ignore
    if self.drop_last anddataset_len % self.num_replicas ! =0:  # type: ignore
        # Split to nearest available length that is evenly divisible.
        # This is to ensure each rank receives the same amount of data when
        # using this Sampler.
        self.num_samples = math.ceil(
            # `type:ignore` is required because Dataset cannot provide a default __len__
            # see NOTE in pytorch/torch/utils/data/sampler.py
            (dataset_len - self.num_replicas)
            / self.num_replicas
        )
    else:
        self.num_samples = math.ceil(dataset_len / self.num_replicas)  # type: ignore
    self.total_size = self.num_samples * self.num_replicas
    self.shuffle = shuffle
    self.seed = seed

"" "this variable to self = {6} LoadBalancingDistributedSampler: the dataset = {TensorDataset: 12} 
      
        drop_last = {bool} False epoch = {int} 0 num_replicas = {int} 2 num_samples = {int} 6 rank = {int} 0 seed = {int} 0 shuffle = {bool} True total_size = {int} 12 "" "
             
    
    # Here are the main differences from PyTorch native
    self.item_complexity_map = dict(a)for item_index in range(dataset_len):
        Each item has a new complexity
        self.item_complexity_map[item_index] = complexity_fn(
            self.dataset[item_index]
        )

"" Complexity_fn takes the second element of the tuple, let's remember the complexity of the data set {Tensor: Tensor ([7, 8, 11, 4, 5, 2, 9, 10, 0, 6, 1, 3]) = item_complexity_map = {dict: 12} {0: tensor(7), 1: tensor(8), 2: tensor(11), 3: tensor(4), 4: tensor(5), 5: tensor(2), 6: tensor(9), 7: tensor(10), 8: tensor(0), 9: tensor(6), 10: tensor(1), 11: Tensor (3)} 0 = {tensor} tensor(7) # tensor 0 = 7 1 = {tensor} tensor(8) # tensor 8 2 = {tensor} tensor(11) 3 = {Tensor} tensor(4) 4 = {Tensor} tensor(5) 5 = {Tensor} tensor(2) 6 = {Tensor} tensor(9) 7 = {Tensor} tensor(10) 8 = {Tensor} tensor(0) 9 = {Tensor} tensor(6) 10 = {Tensor} tensor(1) 11 = {Tensor} tensor(3) """        
        
    Sort by complexity
    self.ordered_item_complexity_map = OrderedDict(
        sorted(self.item_complexity_map.items(), key=lambda t: t[1]))Ordered_item_complexity_map = {OrderedDict: 12} OrderedDict([(8, tensor(0)), (10, tensor(1)), (5, tensor(2)), (11, tensor(3)), (3, tensor(4)), (4, tensor(5)), (9, tensor(6)), (0, tensor(7)), (1, tensor(8)), (6, tensor(9)), (7, tensor(10)), (2, Tensor (11)]) 8 = {tensor} tensor(0) 10 = {tensor} tensor(1) # 10 = {tensor} tensor(0) So 1 5 = {Tensor} Tensor (2) 11 = {Tensor} Tensor (3) 3 = {Tensor} Tensor (4) 4 = {Tensor} Tensor (5) 9 = {Tensor} Tensor (6) 0  = {Tensor} tensor(7) 1 = {Tensor} tensor(8) 6 = {Tensor} tensor(9) 7 = {Tensor} tensor(10) 2 = {Tensor} tensor(11) """    
    
    max_complexity = max(self.item_complexity_map.values()) # 11
    min_complexity = min(self.item_complexity_map.values()) # 0
    self.random_number = int((max_complexity - min_complexity) * random_level + 1) # 6
    
# random_number = {int} 1
  
Copy the code

The expansion is as follows:

  • TensorDataset, 0 =… Is the actual data, 1 =… It’s data complexity, and then it’s sort by complexity, and all sorts or scrambles are done without moving the original data, but with extra space.
  • The initialization internally sorts complexity,
    • Item_complexity_map is the original complexity of each element, for example, 0: 7 means that the 0th element is 7.
    • Ordered_item_complexity_map is the ordered structure, where (8, 0) indicates that the 8th element is the least complex, which is 0, and the entire map is in ascending order.

The logical graph of the TensorDataset is extended as follows, and the data set ordered_ITEM_complexity_map is now ordered from low to high in complexity.

+-----------------------------------------------------------------------------+
| TensorDataset                                                               |
|                                                                             |
|   0 = {Tensor: 12} tensor([[-1.5556.0.6848],... | | | |1 = {Tensor: 12} tensor([ 7.8.11.4.5.2.9.10.0.6.1.3]) |
|                                                                             |
+-------------------------------------------+---------------------------------+
                                            |
                                            |
                                            v
+-------------------------------------------+------------------------------------------+
| LoadBalancingDistributedSampler.__init__                                             |
|                                                                                      |
|                                                                                      |
|  item_complexity_map = {dict: 12} {0: 7.1: 8.2: 11.3: 4.4: 5.5: 2,              |
|                                                                                      |
|                                    6: 9.7: 10.8: 0.9: 6.10: 1.11: 3}            |
|                                           +                                          |
|                                           |                                          |
|                                           |  sorted                                  |
|                                           |                                          |
|                                           v                                          |
|  ordered_item_complexity_map = {OrderedDict: 12} [(8.0), (10.1), (5.2), (11.3),  |
|                                                                                      |
|                    (3.4), (4.5), (9.6), (0.7), (1.8), (6.9), (7.10), (2.11) | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - +Copy the code

3.4 the use of

Next in the sample code is using data:

dataloader = torch.utils.data.DataLoader(dataset, sampler=sampler)

cur_idx = 0
for i, data in enumerate(dataloader):
    batch_size = data[0].shape[0]
    cur_idx += batch_size * num_replicas
    print(cur_idx)
Copy the code

3.4.1 Obtaining Data

Let’s look at how to get the data, which is how to get the sample from the Loader.

  • Shuffle_chunks is first called to scramble the data.
  • And then you get the index of your rank.
def __iter__(self) -> Iterator:
    index_chunks, chunk_indices = self.shuffle_chunks() # Scramble data
    # subsample
    indices = [index_chunks[i][self.rank] for i in chunk_indices] Use rank to extract data

Chunk_indices = {list: 6} [0, 5, 4, 1, 2, 3] chunk_chunks = {list: 8, 6} [[3], [5, 11], [4, 10], [0, 9], [6, 1], [7, 2]] evenly divided into two groups of indices = {list: 6} [8, 7, 6, 5, 4, 0] get their rank corresponding index "" "    
    return iter(indices)
Copy the code

3.4.2 shuffle

The shuffle is divided into 6 = 12(num_replicas) and 2(num_replicas) groups.

def shuffle_chunks(self) :
    def chunks_wrap_padding(lst, n) :
        """Yield successive n-sized chunks from lst."""
        num_chunks = max(1, self.num_samples)
        num_elements = num_chunks * n
        current_lst = []
        for i in range(num_elements):
            current_lst.append(lst[i % len(lst)])
            if len(current_lst) == n:
                yield current_lst
                current_lst = []

    if self.shuffle: # Need to be shuffled again
        # deterministically shuffle based on epoch and seed
        g = torch.Generator()
        g.manual_seed(self.seed + self.epoch)

        if self.random_number > 0:
            # The shuffling mechanism here is very clever, just randomly regenerating complexity and adding it to the original complexity map
            item_complexity_map = self.item_complexity_map.copy() Make a copy of the original map
            complexity_random_ints = torch.randint( # new is a number of complexity variations
                self.random_number, (len(item_complexity_map),), generator=g
            ).tolist()
""" complexity_random_ints = {list: 12} [2, 3, 5, 0, 1, 3, 1, 1, 1, 3, 5, 2] item_complexity_map = {dict: 12} {0: tensor(7), 1: tensor(8), 2: tensor(11), 3: tensor(4), 4: tensor(5), 5: tensor(2), 6: tensor(9), 7: tensor(10), 8: tensor(0), 9: tensor(6), 10: tensor(1), 11: tensor(3)} """
            
            # original complexity map + complexity change value
            for k, v in zip(item_complexity_map, complexity_random_ints):
                item_complexity_map[k] += v
Create a new complex item_complexity_map = {0: tensor(9), tensor(11), tensor(16), tensor(4), tensor(6), tensor(5) : tensor(5), 6: tensor(10), 7: tensor(11), 8: tensor(1), 9: tensor(9), 10: tensor(6), 11: tensor(5)} """
        
            # Sort the new complexity again
            ordered_item_complexity_map = OrderedDict(
                sorted(item_complexity_map.items(), key=lambda t: t[1]))""" ordered_item_complexity_map = {OrderedDict: 12} OrderedDict([(8, tensor(1)), (3, tensor(4)), (5, tensor(5)), (11, tensor(5)), (4, tensor(6)), (10, tensor(6)), (0, tensor(9)), (9, tensor(9)), (6, tensor(10)), (1, tensor(11)), (7, tensor(11)), (2, tensor(16))]) 8 = {Tensor} tensor(1) 3 = {Tensor} tensor(4) 5 = {Tensor} tensor(5) 11 = {Tensor} tensor(5) 4 = {Tensor} tensor(6) 10 = {Tensor} tensor(6) 0 = {Tensor} tensor(9) 9 = {Tensor} tensor(9) 6 = {Tensor} tensor(10) 1 = {Tensor} tensor(11) 7 = {Tensor} tensor(11) 2 = {Tensor} tensor(16) __len__ = {int} 12 """
        else:
            ordered_item_complexity_map = self.ordered_item_complexity_map

        index_chunks = list( Num_replicas = num_replicas
            chunks_wrap_padding(
                list(ordered_item_complexity_map.keys()), self.num_replicas
            )
        )

"" "to be evenly distributed into two groups, each group of the complexity of the two elements near index_chunks = {list: 6} [[8, 3], [5, 11], [4, 10], [0, 9], [6, 1], [7, 2]] 0 = {list: 2} [8, 3] 1 = {list: 2} [5, 11] 2 = {list: 2} [4, 10] 3 = {list: 2} [0, 9] 4 = {list: 2} [6, 1] 5 = {list: 2} [7, 2] __len__ = {int} 6 """        
        # Shuffle the Index_chunks again
        chunk_indices = torch.randperm(len(index_chunks), generator=g).tolist()  # type: ignore
    
"""
chunk_indices = {list: 6} [0, 5, 4, 1, 2, 3]
"""    
    
    else:
        index_chunks = list(
            chunks_wrap_padding(
                list(self.ordered_item_complexity_map.keys()), self.num_replicas
            )
        )
        chunk_indices = list(range(len(index_chunks)))  # type: ignore

    if not self.drop_last:
        # add extra samples to make it evenly divisible
        padding_size = self.num_samples - len(chunk_indices)
        if padding_size <= len(chunk_indices):
            chunk_indices += chunk_indices[:padding_size]
        else:
            chunk_indices += (
                chunk_indices * math.ceil(padding_size / len(chunk_indices))
            )[:padding_size]
    else:
        # remove tail of data to make it evenly divisible.
        chunk_indices = chunk_indices[: self.num_samples]
    assert len(chunk_indices) == self.num_samples
    return index_chunks, chunk_indices
Copy the code

The overall expansion is as follows:

  • TensorDataset, 0 =… Is the actual data, 1 =… Is the data complexity, and then the order by complexity:
  • LoadBalancingDistributedSampler.__init__The initialization internally sorts complexity,
    • Item_complexity_map is the complexity of each element, for example, 0: 7 means that the 0th element is 7.
    • Ordered_item_complexity_map is the structure sorted by complexity, where (8, 0) indicates that the eighth element is the least complex, which is 0.
  • Shuffle_chunks is continued internally. The shuffling mechanism here is very clever. Instead of moving the data, it is randomly regenerated into complexity and then added to the original complexity map.
    • Complexity_random_ints is regenerated as a number of complexity variations.
    • Item_complexity_map Makes a copy of the original map.
    • Item_complexity_map: New complexity = original complexity Map + Complexity change value.
    • Ordered_item_complexity_map sorts the new complexity.
    • The ordered_ITEM_complexity_map is fragmented according to the num_REPLICas, and the index_chunks are obtained. The ordered_ITEM_complexity_map is evenly divided into six groups, and the complexity of two elements in each group is similar.
    • Then shuffle the index_chunks again to get chunk_indices, just to shuffle the index order.
+--------------------------------------------------------------------------------------+
| TensorDataset                                                                        |
|                                                                                      |
|   0 = {Tensor: 12} tensor([[-1.5556.0.6848],... | | | |1 = {Tensor: 12} tensor([ 7.8.11.4.5.2.9.10.0.6.1.3])          |
|                                                                                      |
+-------------------------------------------+------------------------------------------+
                                            |
                                            |
                                            v
+-------------------------------------------+------------------------------------------+
| LoadBalancingDistributedSampler.__init__                                             |
|                                                                                      |
|                                                                                      |
|  item_complexity_map = {dict: 12} {0: 7.1: 8.2: 11.3: 4.4: 5.5: 2,              |
|                                                                                      |
|                                    6: 9.7: 10.8: 0.9: 6.10: 1.11: 3}            |
|                                           +                                          |
|                                           |                                          |
|                                           |  sorted                                  |
|                                           |                                          |
|                                           v                                          |
|  ordered_item_complexity_map = {OrderedDict: 12} [(8.0), (10.1), (5.2), (11.3),  |
|                                                                                      |
|                    (3.4), (4.5), (9.6), (0.7), (1.8), (6.9), (7.10), (2.11)] |
|                                                                                      |
+-------------------------------------------+------------------------------------------+
                                            |
                                            |
                                            v
+-------------------------------------------+------------------------------------------+
| __iter__                                                                             |
|                                                                                      |
+-------------------------------------------+------------------------------------------+
                                            |
                                            |
                                            v
+-------------------------------------------+------------------------------------------+
|                                                                                      |
| shuffle_chunks()                                                                     |
|                                                                                      |
|                                                                                      |
|   complexity_random_ints = {list: 12} [2.3.5.0.1.3.1.1.1.3.5.2]           |
|                                                                                      |
|                                                                                      |
|                                                                                      |
|   item_complexity_map = {0: 9.1: 11.2: 16.3: 4.4: 6.5: 5.6: 10.7: 11.8: 1,   |
|                                                                                      |
|                                                                9: 9.10: 6.11: 5}   |
|                                                                                      |
|                                                                                      |
|                                                                                      |
|   ordered_item_complexity_map = {OrderedDict: 12} [(8.1), (3.4), (5.5), (11.5),  |
|                                                                                      |
|                                                    (4.6), (10.6), (0.9), (9.9),  |
|                                                                                      |
|                                                (6.10), (1.11), (7.11), (2.16)])  |
|                                                                                      |
|                                           +                                          |
|                                           |                                          |
|                                           |                                          |
|                                           v                                          |
|                                                                                      |
|     index_chunks = {list: 6} [[8.3], [5.11], [4.10], [0.9], [6.1], [7.2]]      |
|                                                                                      |
|                                                                                      |
|     chunk_indices = {list: 6} [0.5.4.1.2.3] | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - +Copy the code

The rule 3.4.3 combed

Shuffle refinement

This may be confusing to the reader, so we need to sort it out.

Ordered_item_complexity_map is the structure sorted by complexity, where (8, 0) indicates that the eighth element is the least complex, which is 0. Ordered_item_complexity_map has 12 elements assigned in two copies, so ordered_ITEM_complexity_map should be evenly divided into six groups with two elements of similar complexity in each group.

index_chunks = {list: 8, 6} [[3], [5, 11], [4, 10], [0, 9], [6, 1], [7, 2]] is the final result, here [8, 3] is a set of complexity, [5, 11] is a set, the complexity of the near, For example, ordered_ITEM_complexity_map:

  • (8, 1), (3, 4) that is, the eighth element is 1, the third element is 4, so index 8, index 3 are grouped.

  • (5, 5), (11, 5) that is, the 5th element is 5, the 11th element is 5, so index 5, index 11 are grouped.

Shuffle_chunks is shown as follows:

+--------------------------------------------------------------------------------------+
| shuffle_chunks                                                                       |
|                                                                                      |
|                                                                                      |
|                                      +--------------+     +---------------+          |
|   ordered_item_complexity_map = [ +--+(8.1), (3.4) - | + + (5.5), (11.5) | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | + -- -- -- -- -- -- -- + (4.6), (10.6) | | | -0.9), (9.9) + -- -- -- -- -- -- -- + | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + | | | | | | -6.10), (1.11) | | | -7.11), (2.16) |] | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- + + | + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- + | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- - + + - +  | | | | | | | | | | | | +------------+ +---------------------+ | | | | | | | | | | v v v v v v | | index_chunks = {list: 6} [[8.3], [5.11], [4.10], [0.9], [6.1], [7.2]]      |
|                                                                                      |
|                                      +                                               |
|                                      |                                               |
|                                      |                                               |
|                                      v                                               |
|                                                                                      |
|     chunk_indices = {list: 6} [0.5.4.1.2.3] | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - +Copy the code
Secondary upset

Let’s go back to the raw data and analyze it. Let’s go back and get the data.

def __iter__(self) -> Iterator:
    index_chunks, chunk_indices = self.shuffle_chunks()
    # subsample
    indices = [index_chunks[i][self.rank] for i in chunk_indices]

Chunk_chunks = {list: 6} [0, 5, 4, 1, 2, 3] index_chunks = {list: 8, 6} [[3], [5, 11], [4, 10], [0, 9], [6, 1], [7, 2]] evenly divided into two groups of indices = {list: 6} [8, 7, 6, 5, 4, 0] get their rank corresponding index "" "    
    
    assert len(indices) == self.num_samples

    return iter(indices)
Copy the code

The original data is: [7, 8, 11, 4, 5, 2, 9, 10, 0, 6, 1, 3]. The subsequent data will be sorted according to the index of the original data.

Rank 0 is [8, 5, 4, 0, 6, 7]. Rank 1 is 3, 11, 10, 9, 1, 2.

Rank and rank 1 0 batch is [[8, 3], [5, 11], [4, 10], [0, 9], [6, 1], [7, 2]], the 221 group.

However, the order needs to be shuffled again, because the current batch is sorted according to the complexity from small to large, which will affect the training effect, so the order needs to be shuffled. Chunk_indices [0, 5, 4, 1, 2, 3]

The order of the sequence is: [[8, 3], [7, 2], [6, 1], [5, 11], [4, 10], [0, 9]].

  • If the worker is rank 0, it will get the six sets of data corresponding to its own index_chunks and get [8, 7, 6, 5, 4, 0].

  • If the worker rank is 1, it is [3,2,1,11,10,9]. Note that these are also indexes of the raw data.

Rank 0 = rank 0 = rank 0

+--------------------------------------------------------------------------------------+
| shuffle_chunks                                                                       |
|                                                                                      |
|                                      +--------------+     +---------------+          |
|   ordered_item_complexity_map = [ +--+(8.1), (3.4) - | + + (5.5), (11.5) | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | + -- -- -- -- -- - + (4.6), (10.6) | | | -0.9), (9.9) + -- -- -- -- -- -- + | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | -6.10), (1.11) | | | -7.11), (2.16) |] | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- + + | + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- + | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- - + + - + | | | | | | | | | | | | +------------+ +--------------------+ | | | | | | | | | | v v v v v v | | index_chunks = {list: 6} [[8.3], [5.11], [4.10], [0.9], [6.1], [7.2]]      |
|                                      +                                               |
|                                      |                                               |
|                                      |                                               |
|                                      v                                               |
|     chunk_indices = {list: 6} [0.5.4.1.2.3]                                     |
|                                                                                      |
+--------------------------------------+-----------------------------------------------+
                                       |
                                       |
                                       v

+--------------------------------------------------------------------------------------+
| __iter__                                                                             |
|                                    0       1        2        3       4       5       |
|        index_chunks = {list: 6} [[8.3], [5.11], [4.10], [0.9], [6.1], [7.2]]   |
|                                   +       +        +        +       +       +        |
|                                   |       |        |        |       |       |        |
|                                   +----+  +-----+  |  +-----+       |       |        |
|                                        |        |  |  |             |       |        |
|                                        |        |  |  |             |       |        |
|                                        v        v  v  v             |       |        |
|                   indices = {list: 6} [8.7.6.5.4.0] | | | | ^ ^ | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | +--------------------------------------------------------------------------------------+Copy the code
The final result

Let’s see what the end result is:

  • The original data is: [7, 8, 11, 4, 5, 2, 9, 10, 0, 6, 1, 3].

  • Rank 0 is [8, 7, 6, 5, 4, 0], and rank 1 is [3,2,1,11,10,9]. Here the value is the index of the original data.

  • The end result:

    • The batch of rank 0 and Rank 1 is [[8, 3], [7, 2], [6, 1], [5, 11], [4, 10], [0, 9]. Here the value is the index of the original data.
    • Rank 1: [4, 11, 7, 3, 1, 6]

As can be seen from the following figure, it is not an ideal equilibrium state due to the introduction of random values in the process, but it is relatively balanced:

                         + 7                          + 6
                         |                            |
                         | 5                          | 1
                         |                            |
                         | 2                          | 3
                         |                            |
batch 3   +----------->  | 9                          | 7  <----------+  batch 3
                         |                            |
batch 2   +----------->  | 10                         | 11 <----------+  batch 2
                         |                            |
batch 1   +----------->  v 0                          v 4  <----------+  batch 1

                  +-------------------+        +-------------------+
                  |                   |        |                   |
                  |     worker 0      |        |     worker 1      |
                  |                   |        |                   |
                  |                   |        |                   |
                  +-------------------+        +-------------------+
Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

PyTorch internals

Quick gossip! Here comes an open source distributed training framework that breaks the parallel bottleneck of TensorFlow and PyTorch.

Arxiv.org/pdf/2107.01…

[1] Dean, Jeffrey, Greg S. Corrado, Rajat Monga, Kai Chen, Matthieu Devin, Quoc V. Le, Mark Z. Mao et al. “Large Scale Distributed Deep Networks” (2012).

[2] Zhengyuan Zhou, Panayotis Mertikopoulos, Nicholas Bambos, Peter Glynn, Yinyu Ye, Li-Jia Li, and Li Fei-Fei. 2018. Distributed asynchronous optimization with unbounded delays: How slow can you go? In International Conference on Machine Learning. PMLR, 5970 — 5979.

[3] DanAlistarh, DemjanGrubic, JerryLi, RyotaTomioka, and MilanVojnovic. 2016. QSGD: Communication-efficient SGD via gradient quantization and encoding. arXiv preprint arXiv:1610.02132 (2016).

[4] Dan Alistarh, Torsten Hoefler, Mikael Johansson, Sarit Khirirat, Nikola Konstanti- nov, And Cedric Renggli. 2018. Convergence of Sparsified gradient methods. In Proceedings of The 32ND International Conference on Neural Information Processing Systems. 5977 — 5987.

[5] Anastasia Koloskova, Sebastian Stich, and Martin Jaggi. 2019. Decentralized stochastic optimization and gossip algorithms with compressed communication. In International Conference on Machine Learning. PMLR, 3478 — 3487.

[6] Xiangru Lian, Ce Zhang, Huan Zhang, Cho-Jui Hsieh, Wei Zhang, and Ji Liu. 2017. Can decentralized algorithms outperform centralized algorithms? a case study for decentralized parallel stochastic gradient descent. In Proceedings of the 31st International Conference On Neural Information Processing Systems. 5336 — 5346.

[7] Christopher De Sa, Matthew Feldman, Christopher Re, and Kunle Olukotun. 2017. Understanding and optimizing asynchronous low-precision stochastic gradient descent. In Proceedings of the 44th Annual International Symposium on Computer Architecture. 561 — 574.

[8] Xiangru Lian, Wei Zhang, Ce Zhang, and Ji Liu. 2018. Asynchronous decentral- ized parallel stochastic gradient descent. In International Conference on The Machine Learning. PMLR, 3043-3052.

[9] Hanlin Tang, Shaoduo Gan, Ce Zhang, Tong Zhang, and Ji Liu. 2018. Com- munication compression for decentralized training. In Proceedings of the 32nd International Conference on Neural Information Processing Systems. 763 — 773.

[10] Ji Liu, Ce Zhang, Foundations and Trends® in Databases 9, 1 (2020), 1-100.