0 x00 the

In the previous article, we introduced the overall architecture of PipeDream, the Profile stage, the computing partition stage, the model transformation stage and the runtime engine. In this paper, we introduced the communication module of PipeDream, which is the basis of the engine and also PyTorch DDP. A kaleidoscope and perfect example of how P2P is used.

Pipelining parallelism other articles are linked below:

Deep learning pipeline parallel Gpipe(1)– pipeline basic implementation

Deep learning pipeline parallel GPipe (2) —– gradient accumulation

Deep learning pipeline parallel GPipe(3) —- recalculation

Deep learning pipeline parallel PipeDream(1)– Profile stage

Deep learning pipeline parallel PipeDream(2)– computing partitions

Deep learning pipeline parallel PipeDream(3)– transformation model

Deep learning pipeline parallel PipeDream(4)– runtime engine

0 x01 preface

The communication module code is located at: runtime/communication.py. Let’s first think about what functions the communication module needs?

  • Communication between stages. How to handle stages on different machines? What happens on the same machine?
  • Because the communication is mainly asynchronous, the performance of different nodes may be different. Is a caching mechanism needed to coordinate different nodes, similar to the back pressure function?
  • There are many parameters of deep learning, many tensors and gradients involved, and many layers. The number of parallel data in each layer is also different. Therefore, how can forward propagation and back propagation ensure that they run in a certain order?
  • Because nodes need to carry out forward and backward propagation, multiple threads need to be established for transmission respectively.

Therefore, when we analyze the following, we will consider these problems.

0 x02 class definition

The CommunicationHandler is responsible for communication between stages.

  • If the stage is on a different machine, send/ RECV of PyTorch P2P is used.
  • If the stage is on the same machine, the Broadcast of PyTorch P2P is used.

In the following code, we initialize various member variables, the most familiar ones are DDP related ones, such as init_process_group.

class CommunicationHandler(object): """ Handles communication between stages. For stages on different machines, use send/recv. For stages on same machine, use broadcast. """ def __init__(self, master_addr, master_port, rank, local_rank, num_ranks_in_server, world_size, fp16, backend): """ Set up process groups. Note: To turn off broadcasting, set num_ranks_in_server = 1. """ self.rank = rank self.local_rank = local_rank self.backend = backend self.num_ranks_in_server = num_ranks_in_server self.world_size = world_size self.fp16 = fp16 assert num_ranks_in_server > 0 # Initialize the distributed environment. # environ['MASTER_ADDR'] = MASTER_ADDR os.environ['MASTER_PORT'] = str(master_port) dist.init_process_group(backend, rank=rank, world_size=world_size) assert dist.get_world_size() == self.world_size # Stores list of ranks of GPUs on the same server. self.ranks_in_server = [] if num_ranks_in_server == 1: return # Stores information about tensors sent directly GPU-to-GPU. self.connection_list = [] # Stores process groups (for broadcast() connections). self.process_groups = {} # Populate ranks_in_server. rank_of_first_gpu_in_server = rank -  rank % num_ranks_in_server for connected_rank in range( rank_of_first_gpu_in_server, rank_of_first_gpu_in_server + num_ranks_in_server): if connected_rank == rank: continue self.ranks_in_server.append(connected_rank) assert len(self.ranks_in_server) == num_ranks_in_server - 1, \ self.ranks_in_serverCopy the code

0 x03 build

3.1 the initialization

As mentioned in the previous section, when a CommunicationHandler is generated, initialize is called.

        if self.comm_handler is not None:
            self.comm_handler.initialize(
                self.receive_ranks,
                self.send_ranks,
                self.tensor_tags,
                self.target_tensor_names,
                self.training_tensor_dtypes,
                self.rank_in_stage,
                self.num_ranks_in_stage,
                self.ranks_in_previous_stage,
                self.ranks_in_next_stage)
​
Copy the code

In the initialization code, do the following, mainly:

  • Build the queue needed for communication.
  • Build the order in which messages are sent.
  • Build the process group.
def initialize(self, receive_ranks, send_ranks, tensor_tags, target_tensor_names, training_tensor_dtypes, rank_in_stage, num_ranks_in_stage, ranks_in_previous_stage, ranks_in_next_stage): """ Initialize state needed for CommunicationHandler. """ self.receive_ranks = receive_ranks self.send_ranks = send_ranks self.tensor_tags = tensor_tags self.target_tensor_names = target_tensor_names self.training_tensor_dtypes = training_tensor_dtypes self.rank_in_stage = rank_in_stage self.num_ranks_in_stage = num_ranks_in_stage self.ranks_in_previous_stage = ranks_in_previous_stage self.num_ranks_in_previous_stage = len(ranks_in_previous_stage) self.ranks_in_next_stage = ranks_in_next_stage self.num_ranks_in_next_stage = len(ranks_in_next_stage) Setup_messaging_schedule () # Build the order in which messages are sent self.create_process_groups() # Build the process groupCopy the code

Our specific analysis is as follows.

3.2 create a queue

Queue is used as the basis for send and receive. The system finds the Queue based on index and performs corresponding operations.

The Initialize function passes in two ranks lists.

  • Receive_ranks is the rank input of the node.
  • Send_ranks is the output rank of the node.

Ranks list is as follows:

Receive_ranks = {dict: 3} # tensor = {list: 1} [2] # tensor = {list: 1} # Ranks 'out9' = {list: 1} [2] # ranks 'out10' = {list: 1} [2] __len__ = {int} 3Copy the code

Setup_queues Queues have been created:

  • Forward_receive_queues: queues that accept tensors during forward propagation. Corresponding to receive_ranks.
  • Backward_send_queues: queues used to send tensors during propagation. Corresponding to receive_ranks. Because the object received in forward propagation is the target sent in back propagation.
  • Forward_send_queues: Queues used to send tensors during forward propagation. Send_ranks is used.
  • Backward_receive_queues: queues that accept tensors during backward propagation. Send_ranks is used. Because the target sent in forward propagation is the object received in backward propagation.

The general logic is as follows:

forward_receive_queues <-----> receive_ranks <------->  backward_send_queues
forward_send_queues  <------>  send_ranks    <------->  backward_receive_queues
Copy the code

Take forward_Receive_queues.

  • The list forward_Receive_queues contains multiple queues.
  • The receive_ranks list contains multiple ranks. Each rank has a tensor corresponding to it in the communication process. The receive_ranks list contains multiple tensors corresponding to a tensor name. Tensor names are similar to: target_tensor_names = {“target”, “target_length”}.
  • In the list of forward_receive_queues, each queue corresponds to a tensor in the receive_ranks.
  • Each tensor corresponds to a unique tag. The purpose of PipeDream is to make each tag have its own process group, because any stage may be parallel.
  • Register [tag, rank] to connection_list for this tensor and unique tag.

Details are as follows:

def setup_queues(self): """ Setup queues for communication between main compute thread and helper communication threads. One queue per tensor in  forward / backward direction. """ self.forward_receive_queues = {} self.backward_receive_queues = {} self.forward_send_queues = {} self.backward_send_queues = {} self.num_forward_threads = 0 self.num_backward_threads = 0 self.target_receive_rank_counts = {} self.target_send_rank_counts = {} # Setup queues for each tensor to be received and  sent. for input_name in self.receive_ranks: The queue corresponding to the input_name tensor Input_name is the name of the tensor self.forward_receive_queues[input_name] = [] self.backward_send_queues[input_name] = [] # Walk through each of the dimensions of this tensor ranks for i in range(len(self.receive_ranks[input_name])): self.forward_receive_queues[input_name].append( threadsafe_queue.Queue()) self.backward_send_queues[input_name].append( Threadsafe_queue.queue ()) # take ranks target_receive_ranks = self. Receive_ranks [input_name][I] # Take ranks, Register tensor self.register_tensor(connected_rank=target_receive_rank, tag=self.tensor_tags[input_name]) if target_receive_rank not in self.target_receive_rank_counts: self.target_receive_rank_counts[target_receive_rank] = 0 self.target_receive_rank_counts[target_receive_rank] += 1 self.num_forward_threads += 1 self.num_backward_threads += 1 for output_name in self.send_ranks: Queue self.backward_receive_queues[output_name] = [] self.forward_send_queues[output_name] = queue self.backward_receive_queues[output_name] = queue self.forward_send_queues[output_name] = [] # ranks for I in range(len(self.send_ranks[output_name])): [] # ranks for I in range(len(self.send_ranks[output_name])): self.backward_receive_queues[output_name].append( threadsafe_queue.Queue()) Self. Forward_send_queues [output_name].append(threadsafe_queue.queue ()) # Self. Send_ranks [output_name][I] # Register tensor self.register_tensor(connected_rank=target_send_rank, tag=self.tensor_tags[output_name]) if target_send_rank not in self.target_send_rank_counts: self.target_send_rank_counts[target_send_rank] = 0 self.target_send_rank_counts[target_send_rank] += 1 Self. num_forward_threads += 1 self.num_backward_threads += 1 self.target_tensor_names: # Queues for target in forward pass. self.forward_receive_queues[target_tensor_name] = [] self.forward_send_queues[target_tensor_name] = [] if self.num_ranks_in_previous_stage > 0: self.receive_ranks[target_tensor_name] = self.ranks_in_previous_stage for i in range(len(self.receive_ranks[target_tensor_name])): # for rank, Register tensor self.register_tensor(connected_rank=self.receive_ranks[target_tensor_name][I], tag=self.tensor_tags[target_tensor_name]) self.forward_receive_queues[target_tensor_name].append( threadsafe_queue.Queue()) self.num_forward_threads += 1 if self.num_ranks_in_next_stage > 0: self.send_ranks[target_tensor_name] = self.ranks_in_next_stage for i in range(len(self.send_ranks[target_tensor_name])):  self.register_tensor( connected_rank=self.send_ranks[target_tensor_name][i], tag=self.tensor_tags[target_tensor_name]) self.forward_send_queues[target_tensor_name].append( threadsafe_queue.Queue())  self.num_forward_threads += 1 print ("Send ranks: ", self.send_ranks) print ("Receive ranks: ", Self. Receive_ranks) # Queues for ack for forward pass-only runs as a clocking mechanism self.num_ack_threads = 0 if "ack" in self.tensor_tags: self.backward_receive_queues["ack"] = [] self.backward_send_queues["ack"] = [] for i in range(self.num_ranks_in_previous_stage): # for rank, Register tensor self.register_tensor(connected_rank=self.ranks_in_previous_stage[I], tag=self.tensor_tags["ack"]) self.backward_send_queues["ack"].append( threadsafe_queue.Queue()) self.num_ack_threads += 1 for i in range(self.num_ranks_in_next_stage): # for rank, Register tensor self.register_tensor(connected_rank=self.ranks_in_next_stage[I], tag=self.tensor_tags["ack"]) self.backward_receive_queues["ack"].append( threadsafe_queue.Queue()) self.num_ack_threads + = 1Copy the code

Note that each tensor has a unique tag. For this tensor and this unique tag, register [tag, rank] to connection_list.

    def register_tensor(self, connected_rank, tag):
        """
        Builds connections list of tensors that are communicated GPU to GPU.
​
        For tensors that are sent GPU-to-GPU (intra-server for GLOO backend),
        make a list of destination/source ranks and the corresponding tag.
        This information is then used to crate process groups.
        """
        if not self.is_gpu_to_gpu_comm(connected_rank=connected_rank):
            return
        connection_info = [tag, connected_rank]
        self.connection_list.append(connection_info)
​
Copy the code

The logic is as follows. We take only parts of the ranks, queues, etc. The queues in forward_receive_queues are used as buffers for the corresponding tensors.

+------------------------+ 'out8' = {list: 1} [2] | | | receive_ranks +-----------> 'out9' = {list: 1} [2] | | +------------------------+ 'out10' = {list: 1} [2] + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 'out8: Queue | forward_receive_queues + -- -- -- -- -- -- -- - > | |' out9 ':  Queue +--------------------------+ 'out10' : Queue +--------------------------+ 'out8' : rank 2 | | | connection_list +---------> 'out9' : rank 2 | | +--------------------------+ 'out10' : rank 2Copy the code

3.3 Forward and backward sequence

Next, establish the forward and backward order of message delivery, whose purpose is to let each worker record how to process the rank from the forward/back layer.

3.3.1 Establishing the sequence

The setup_messaging_schedule method is used to establish:

  • The order of acceptance when propagating forward.
  • The order in which it is sent when propagating backwards.

The important point here is: if the number of previous ranks is greater than the number of this layer, add the previous rank corresponding to I and the previous rank corresponding to I + (the number of this layer) * n to the schedule of this layer I (self.message_schedule). N equals num_ranks_in_stage.

Finally, the order is put into the self.messaging_schedule member variable. If this stage has three ranks, self.messaging_schedule is the message_schedule of the three ranks, and each message_schedule contains some ranks corresponding to the next rank.

To further refine:

  • Self. messaging_schedule is a list.
  • Self. messaging_schedule Each item is a list. Self. messaging_schedule[I] indicates the schedule (message_schedule) of the ith rank.
  • Schedule (message_schedule) is some ranks at the next or next level.
  • The ranks included in message_schedule is an index of the ranks included in this stage. Since it is for internal use, it does not need to be a real rank value, as long as it can be mapped to other internal data structures such as queues.

The code is as follows:

def setup_messaging_schedule(self): """ Order in which to receive forward and send backwards. Separate indexes of ranks in previous stage based on their corresponding offset in this stage. Then each worker will go in increasing order within a subset, and process subsets in a decreasing order. This is done so that messages are processed in the order that they are sent. Backwards send is done so that that it matches up with forward receive. """ self.messaging_schedule = [] for i in range(self.num_ranks_in_stage): Idx = I message_schedule = [] while idx < self.num_ranks_in_previous_stage: Message_schedule.append (idx) message_schedule.append(idx) Idx += self.num_ranks_in_stage if len(message_schedule) > 0: Self. Messaging_schedule. Append (message_schedule) self.fwd_messaging_scheduling_row = self.rank_in_stage Self.fwd_messaging_col = 0 # receive forward self.bwd_messaging_row = self.rank_in_stage  index self.bwd_messaging_scheduling_col = 0 # send backwards # For cases where previous stage has less workers than current stage. while self.fwd_messaging_scheduling_row >= \ len(self.messaging_schedule): self.fwd_messaging_scheduling_row -= 1 self.bwd_messaging_scheduling_row -= 1Copy the code

The specific logic is as follows:

+-------------------+ +--------------------------------------------------+ | Stage 0 | | Stage 1 | | | | | | | | | | | |  +----------------------------------------+ | | | send_ranks | | messaging_schedule | | | ranks: | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- - > | | | | | [0,1,2,3,4,5, | | | message_schedule + - >,1,2,9 [0] | | | 6,7,8,9,10,11,12] | | | | | | | | | message_schedule + - >,4,5,6,10 [3] | | | | | | | | | | | | message_schedule + - >,7,8,11 [6] | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + |  | | | | +-------------------+ +--------------------------------------------------+Copy the code

3.3.2 Obtaining the message sequence

The get_messaging_index method is used to get the passed object, which rank should be interacting with.

def get_messaging_index(self, sending): if sending: connection_rank = self.messaging_schedule[ self.bwd_messaging_scheduling_row][ self.bwd_messaging_scheduling_col] else: connection_rank = self.messaging_schedule[ self.fwd_messaging_scheduling_row][ self.fwd_messaging_scheduling_col] return  connection_rankCopy the code

Where is get_messaging_index used? It used to be send, recv, which you use when you’re dealing with the previous layer.

Such as:

def recv(self, tensor_name, forward_minibatch_id, backward_minibatch_id, backward=False): if backward: index = (backward_minibatch_id + self.rank_in_stage) % \ len(self.backward_receive_queues[tensor_name]) tensor = self.backward_receive_queues[tensor_name][ index].remove() return tensor else: Index = self.get_messaging_index(sending=False) # to get which tensor to use, Remove () if tensor. Dtype == torch. Float32: tensor = tensor.requires_grad_() return tensorCopy the code

3.3.3 Adding a message sequence

The increment_messaging_index method is used to increment the sequence of messages that should be used next time.

Two parameters need to be specified:

  • Bwd_messaging_col Indicates the specific rank index of the upstream.
  • Bwd_messaging_scheduling_row indicates its rank index.

The method is as follows:

def increment_messaging_index(self, sending): if sending: Self. bwd_messaging_col += 1 # send recursively if self. bwd_messaging_col == len( self.messaging_schedule[ self.bwd_messaging_scheduling_row]): Self. bwd_messaging_col = 0 self. bwd_messaging_row -= 1 self.bwd_messaging_scheduling_row == -1: Self.bwd_messaging_row = \ # Reset to self.bwd_messaging_row = \ # reset to self.bwd_messaging_schedule, Len (self.messaging_schedule) - 1 else: len(self.messaging_schedule) If self. fwd_messaging_col == len(* * * * * * * * * * * * * * * * * * *) self.messaging_schedule[ self.fwd_messaging_scheduling_row]): Self. fwd_messaging_col = 0 self. fwd_messaging_row -= 1 self.fwd_messaging_scheduling_row == -1: Self.fwd_messaging_scheduling_row = \ # Reset to self.messaging_schedule (len(self.messaging_schedule) -1)Copy the code

Where will it be used? It is used in the following functions:

def receive_tensors_forward(self): if self.loader_iter is not None: # ...... else: # Receive all required tensors from upstream machines. # ...... # Used to track where to receive forward from. self.comm_handler.increment_messaging_index( sending=False) def send_tensors_backward(self): # Send all required gradients upstream. if self.num_ranks_in_previous_stage > 0: # Used to track where to send tensors in the # backward pass. self.comm_handler.increment_messaging_index( sending=True)  def run_ack(self): if self.stage > 0: self.comm_handler.send( "ack", torch.zeros(self.tensor_shapes["ack"], dtype=torch.int64).cuda(), forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=True) # Used to track where to receive forward from. self.comm_handler.increment_messaging_index(sending=True)Copy the code

3.4 Creating a Process Group

The goal is to set up two process groups for each tensor, one for forward and one for backward. Each tensor has its own tag. Each tag has its own two process groups, because any stage can be parallel.

3.4.1 track design

First, let’s take a look at the comments and learn why they are designed this way.

The create_process_groups method creates process groups in the same order among all ranks. In order to establish process groups in the same order, each worker will collect the connection_list (GPU to GPU) of all other workers. To do this, each worker collects the maximum size of the connection_list (L) of all other workers. Then each worker creates a tensor of size Lx2, where each line represents a connection, and fills this tensor according to “its own connection list size”. The worker with the largest list of connections fills the entire tensor.

After this list is built, the all_Gather operation is performed, and then each worker has an identical NxLx2 output, where N is the number of workers (world_size), and each index output represents a list of worker connections. For I =self.rank, the output will be the same as the local connection list for this worker.

Each worker iterates over the list of connections in the same order, checking to see if each connection has been created (each connection will appear twice in the output), and if not, creating a new process group for both forward and backward. Since ranks are always the same in a group of processes, small ranks come first and big ones last.

3.4.2 code

Back to the code, let’s go through it.

+--------------------------+       'out8' : rank 2
|                          |
|    connection_list  +--------->  'out9' : rank 2
|                          |
+--------------------------+       'out10' : rank 2
Copy the code

Therefore, the purpose is to establish the same process group in each worker. For each tensor, set two process groups, one forward and one backward.

  • Find the largest connection_list of workers

  • Get the size of connection_list, connection_list_size

  • The collection communication is used to aggregate connection_list_size, resulting in Gathered_connection_list_sizes being the collection of connection_list_size on all nodes

  • Get the maximum value of the connection_list

  • Use maximum values to build a list of tensors, connection_list_tensor

  • Move the tensor onto the GPU

  • Use set communication to aggregate connection_list_tensor to get aggregated_connection_list

  • On each worker, create the same process group using dist. New_group

  • Iterate over each connection in the Aggregated_connection_list

    • Get the tag of the tensor
    • For each tensor, set up two process groups, one forward and one backward

That’s where connection_list is used. The specific logic is:

The specific code is as follows:

def create_process_groups(self): """ Create process groups in the same order across all ranks. To create process groups in the same order, each worker collects the connection_list of all other workers. To do this, every worker gathers the largest size of all other worker's connection_lists (L). Then every worker creates a tensor of size Lx2, where each row represents a connection, and fills up this tensor depending on how large its own connection list is. The worker(s) w/ the largest connection list  will fill up the entire tensor. After constructing this list, an all_gather is performed, after which each worker has an identical NxLx2 output, where N is the number of workers (world_size), and each index of output represents a worker's connection list. For i=self.rank, the output will be identical to the workers local connection list. Each worker then iterates in the same order over the connections list, checking if each connection has been created yet (every connection will appear twice in the output), and creating a new process group if one doesn't exist for that connection, for both the forward and backward direction. Since ranks within process groups must always be identical, the smaller rank always goes first, followed by the larger rank. """ if self.num_ranks_in_server == 1: return print("Setting up process groups for broadcasts..." ) # Figure out the size of the largest connection list that any worker # has (L) Connection_list size, Connection_list_size connection_list_size = torch. Tensor (len(self.connection_list), dtype=torch.int) if self.backend == NCCL: connection_list_size = connection_list_size.cuda() gathered_connection_list_sizes = [ Torch. Ones_like (connection_list_size) for _ in range(self.world_size)] Gathered_connection_list_sizes is the collection of connection_list_size on all nodes dist. All_gather (Gathered_connection_list_sizes, Max_connection_list_size = Max (Gathered_connection_list_size) if max_connection_list_size == 0: Connection_list_tensor # Build tensor to send local connection list to all other workers. connection_list_tensor = torch.ones([max_connection_list_size, 2], Dtype =torch. Int) * -1 # Move tensor to GPU if self.backend == NCCL: connection_list_tensor = connection_list_tensor.cuda() if len(self.connection_list) > 0: Connection_list_tensor [0:len(self.connection_list)] = \ torch.IntTensor(self.connection_list # Gather connection lists of all workers. Aggregated_connection_list = [ torch.ones_like(connection_list_tensor) for _ in range(self.world_size)] dist.all_gather(aggregated_connection_list, Connection_list_tensor) # on every worker, Construct identical process groups on each worker. local_rank_connections = 0 for src_rank in range(len(aggregated_connection_list)): for connection in aggregated_connection_list[src_rank]: Tag = int(connection[0]) dst_rank = int(connection[1]) assert dst_rank == -1 continue min_rank = min(src_rank, dst_rank) max_rank = max(src_rank, dst_rank) assert min_rank ! = max_rank if min_rank not in self.process_groups: self.process_groups[min_rank] = {} if max_rank not in self.process_groups[min_rank]: self.process_groups[min_rank][max_rank] = {} if tag not in self.process_groups[min_rank][max_rank]: # Use pytorch P2P API sub_process_group_fwD = dist. New_group (ranks=[min_rank, Max_rank]) sub_process_group_bwd = dist. New_group (ranks=[min_rank, max_rank]) # Set process group self.process_groups[min_rank][max_rank][tag] = {'forward': sub_process_group_FWd, 'BACKWARD ': sub_process_group_bwd } if min_rank == self.rank or max_rank == self.rank: local_rank_connections += 1 assert local_rank_connections == len(self.connection_list)Copy the code

How do I use process groups? Functions such as recv_helper_thread_args are used, for example:

def recv_helper_thread_args(self, tensor_name, index, dtype, backward, num_iterations): if backward: src_rank = self.send_ranks[tensor_name][index] else: Src_rank = self.receive_ranks[tensor_name][index] sub_process_group = None # Obtain tensor_name tag tag = self.tensor_tags[tensor_name] if self.is_gpu_to_gpu_comm(connected_rank=src_rank) and tensor_name ! = "ack": min_rank = min(self.rank, src_rank) max_rank = max(self.rank, src_rank) if src_rank > self.rank: Process_groups [min_rank][max_rank][tag]['backward'] else: # get process group with tag = \ self.process_groups[min_rank][max_rank][tag]['backward'] else: Select * from process group where tag = process group The caller will then use sub_process_group = \ self.process_groups[min_rank][max_rank][tag]['forward'] assert sub_process_group if backward: queue = self.backward_receive_queues[tensor_name][index] else: queue = self.forward_receive_queues[tensor_name][index] tensor_shape = self.tensor_shapes[tensor_name] return (queue, self.counter, self.local_rank, tensor_name, src_rank, tag, tensor_shape, dtype, sub_process_group, num_iterations)Copy the code

3.5 Starting the helper thread

Use start_helper_threads to start the helper thread. These helper threads are intended for P2P use.

First, for example, you can see that key is the tensor name and value is the list of ranks.

Receive_ranks = {list: 1} [2] 'out9' = {list: 1} [2] 'out10' = {list: 1} 1} [2] __len__ = {int} 3Copy the code

3.5.1 Creating a Thread

Remember the 4 queues we built earlier:

  • Forward_receive_queues: queues that accept tensors during forward propagation. Corresponding to receive_ranks.
  • Backward_send_queues: queues used to send tensors during propagation. Corresponding to receive_ranks. Because the object received in forward propagation is the target sent in back propagation.
  • Forward_send_queues: Queues used to send tensors during forward propagation. Send_ranks is used.
  • Backward_receive_queues: queues that accept tensors during backward propagation. Send_ranks is used. Because the target sent in forward propagation is the object received in backward propagation.

These four queues actually correspond to four different helper threads.

Idea is:

  • Processing is performed against the received ranks, that is, traversing tensors in the receive_ranks

    • Walk through the ranks corresponding to the tensor, for each rank

      • Backward-processing is required, so a backward-sending thread is created
      • Create the receiving helper thread
  • Process for sending ranks, that is, traverse the tensor in send_ranks

    • Walk through the ranks corresponding to the tensor, for each rank

      • Backward processing is required, so a backward receiving thread is created
      • Create the sending helper thread
  • Process for target

  • If there is only forward, you need to complete the ACK

The specific code is:

def start_helper_threads(self, num_iterations, forward_only): """ Start helper communication threads, one for each queue. """ if forward_only: self.set_counter(self.num_forward_threads + self.num_ack_threads) # For validation, receive acks in backward pass from next stage, send # acks in backward pass to next stage. self.receive_ranks["ack"] = self.ranks_in_previous_stage self.send_ranks["ack"] = self.ranks_in_next_stage else: self.set_counter(self.num_forward_threads + self.num_backward_threads) if "ack" in self.receive_ranks: del self.receive_ranks["ack"] if "ack" in self.send_ranks: del self.send_ranks["ack"] (num_iterations_for_forward_threads, num_iterations_for_backward_threads) = \ self.num_iterations_for_helper_threads( num_iterations=num_iterations) dtype = If self.fp16 else torch. Float32 # Setup queues for each tensor to be received and sent  input_name in self.receive_ranks: if input_name in self.target_tensor_names or input_name == "ack": Continue # Ranks for I in range(len(self.receive_ranks[input_name])): if not forward_only: # Backward processing is required, Self. start_helper_thread(self.send_helper_thread_args, send_helper_thread, [input_name, I, True], Num_iterations_for_backward_threads) # create thread self.start_helper_thread(self.recv_helper_thread_args, recv_helper_thread, [input_name, i, self.training_tensor_dtypes[input_name], False], Num_iterations_for_backward_threads) # For output_name in self.send_ranks: if output_name in self.target_tensor_names or output_name == "ack": Continue # Ranks for I in range(len(self.send_ranks[output_name])): if not forward_only: # Backward processing is required, Recv_thread_args, recv_helper_thread, [output_name, I, self.training_tensor_dtypes[output_name], True], Num_iterations_for_forward_threads) # send the helper thread self.start_helper_thread(self.send_helper_thread_args, send_helper_thread, [output_name, i, False], Num_iterations_for_forward_threads) # for target_tensor_name in self.target_tensor_names: if self.num_ranks_in_previous_stage > 0: for i in range(len(self.receive_ranks[target_tensor_name])): self.start_helper_thread( self.recv_helper_thread_args, recv_helper_thread, [target_tensor_name, i, torch.int64, False], num_iterations_for_backward_threads) if self.num_ranks_in_next_stage > 0: for i in range(len(self.send_ranks[target_tensor_name])): self.start_helper_thread( self.send_helper_thread_args, send_helper_thread, [target_tensor_name, i, False], num_iterations_for_forward_threads) # Start helper threads for ack for forward pass-only run as a clocking # mechanism. # ack if forward_only: # ack if "ack" in self. Receive_ranks: # ack if "ack" in self. for i in range(len(self.receive_ranks["ack"])): self.start_helper_thread(self.send_helper_thread_args, send_helper_thread, ["ack", i, True], num_iterations_for_backward_threads) if "ack" in self.send_ranks: for i in range(len(self.send_ranks["ack"])): self.start_helper_thread(self.recv_helper_thread_args, recv_helper_thread, ["ack", i, torch.int64, True], num_iterations_for_forward_threads)Copy the code

The specific thread establishment function is:

def start_helper_thread(self, args_func, func, args_func_args, num_iterations): """ Start passed-in func on a helper thread. """ args_func_args += [num_iterations] args = args_func(*args_func_args) # Thread(target=func, # args=args) helper_thread.start()Copy the code

3.5.2 Thread main function

Recv_helper_thread and send_Helper_thread are the receiving and sending helper threads, respectively. Call _recv and _send, respectively, to do the work.

Note the use of functions to get the corresponding arguments. Recv_helper_thread_args and send_helper_thread_args are used to get the parameters.

def recv_helper_thread(queue, counter, local_rank, tensor_name,
                       src_rank, tag, tensor_shape, dtype,
                       sub_process_group, num_iterations):
    torch.cuda.set_device(local_rank)
    # This method is to be executed from a helper daemon thread.
    for i in range(num_iterations):
        tensor = _recv(
            tensor_name, src_rank, tensor_shape=tensor_shape,
            dtype=dtype, tag=tag,
            sub_process_group=sub_process_group)
        queue.add(tensor)
    counter.decrement()
​
def send_helper_thread(queue, counter, local_rank, tensor_name,
                       src_rank, dst_rank, tag,
                       sub_process_group, num_iterations):
    torch.cuda.set_device(local_rank)
    # This method is to be executed from a helper daemon thread.
    for i in range(num_iterations):
        tensor = queue.remove()
        _send(tensor, tensor_name, src_rank, dst_rank,
              tag=tag,
              sub_process_group=sub_process_group)
    counter.decrement()
Copy the code

3.5.3 Build Parameters

Recall that in the create_process_groups method, there is code that assigns process groups to each tag and uses these process groups to perform the logic in the helper thread:

if tag not in self.process_groups[min_rank][max_rank]:
  sub_process_group_fwd = dist.new_group(ranks=[min_rank, max_rank])
    sub_process_group_bwd = dist.new_group(ranks=[min_rank, max_rank])
​
  self.process_groups[min_rank][max_rank][tag] = {
      'forward': sub_process_group_fwd,
        'backward': sub_process_group_bwd
  }
Copy the code

Use the following function to get the thread main function parameters. The basic logic is:

  • Using the tensor name, get the corresponding rank
  • Using the tensor name, the corresponding tag is obtained
  • Use the tag to get the corresponding process group
  • Use the tensor name and index to get the corresponding queue
  • Returns the parameter
def recv_helper_thread_args(self, tensor_name, index, dtype, backward, num_iterations): # use tensor name to get the corresponding rank if backward: src_rank = self.send_ranks[tensor_name][index] else: Src_rank = self.receive_ranks[tensor_name][index] # Sub_process_group = None tag = self.tensor_tags[tensor_name] # self.is_gpu_to_gpu_comm(connected_rank=src_rank) and tensor_name ! = "ack": min_rank = min(self.rank, src_rank) max_rank = max(self.rank, src_rank) if src_rank > self.rank: sub_process_group = \ self.process_groups[min_rank][max_rank][tag]['backward'] else: Sub_process_group = \ self.process_groups[min_rank][max_rank][tag]['forward'] Assert sub_process_group # gets the corresponding queue if backward: queue = self.backward_receive_queues[tensor_name][index] else: Queue = self.forward_receive_queues[tensor_name][index] tensor_shape = self.tensor_shapes[tensor_name] # Return parameter RETURN (queue, self.counter, self.local_rank, tensor_name, src_rank, tag, tensor_shape, dtype, sub_process_group, num_iterations) def send_helper_thread_args(self, tensor_name, index, backward, num_iterations): Get the corresponding rank if backward using the tensor name: dst_rank = self.receive_ranks[tensor_name][index] num_ranks_in_connected_stage = self.num_ranks_in_previous_stage else: dst_rank = self.send_ranks[tensor_name][index] num_ranks_in_connected_stage = self.num_ranks_in_next_stage # Sub_process_group = None Tag = self.tensor_tags[tensor_name] if self.is_gpu_to_gpu_comm(connected_rank=dst_rank) and tensor_name ! = "ack": min_rank = min(self.rank, dst_rank) max_rank = max(self.rank, dst_rank) if dst_rank > self.rank: sub_process_group = \ self.process_groups[min_rank][max_rank][tag]['forward'] else: Sub_process_group = \ self.process_groups[min_rank][max_rank][tag]['backward'] Assert sub_process_group # get the corresponding queue if backward: queue = self.backward_send_queues[tensor_name][index] else: Queue = self. Forward_send_queues [tensor_name][index] # return (queue, self. Counter, self. self.rank, dst_rank, tag, sub_process_group, num_iterations)Copy the code

0x04 Function function

The following functions are the ones that were eventually used to complete pipelined RPC logic.

Here is a decoupling done via queue:

  • Recv and SEND will operate on the queue, adding or extracting tensors to the queue.
  • The helper thread calls _recv and _send to operate on the queue.

So we’re going to look at the implementation of this Queue, and we can see that both add and remove use threading.Condition, which means that several threads can wait, block, producers and consumers on a Queue through add/remove.

class Queue:
    def __init__(self):
        self.queue = []
        self.cv = threading.Condition()
​
    def add(self, tensor):
        self.cv.acquire()
        self.queue.append(tensor)
        self.cv.notify()
        self.cv.release()
​
    def remove(self):
        self.cv.acquire()
        while len(self.queue) == 0:
            self.cv.wait()
        tensor = self.queue.pop(0)
        self.cv.release()
        return tensor
Copy the code

4.1 Sending Logic

The sending logic is as follows:

  1. The training code calls stageruntime.run_BACKWARD.
  2. The stageruntime. run_backward method calls stageruntime. send_tensors_BACKWARD to send the tensor tensor_name.
  3. Send_tensors_backward invokes CommunicationHandler. Send to CommunicationHandler member variable backward_send_queues [tensor_name] [index] Adds this tensor. Each tensor corresponds to several queues. This is an uncoupling.
  4. The send function calls backward_send_queues. Add, which tells the send_helper_thread blocking on the queue to work.
  5. CommunicationHandler’s thread send_helper_thread, blocked in queue, will fetch the tensor from backward_send_queues[tensor_name] [index].
  6. Send_helper_thread calls _send to send the tensor.
  7. The final call is dist. Send, which is PyTorch P2P.

The details are as follows:

 StageRuntime            CommunicationHandler              send_helper_thread
​
      +                           +                                 +
      |                           |                                 |
      | 1                         |                                 |
      v                           |                                 |
 run_backward                     |                                 |
      |                           |                                 |
      | 2                         |                                 |
      |                           |                    wait on backward_send_queues
      v                  3        v                                 |
send_tensors_backward +--------> send                               |
                                  |                                 |
                                  |                                 |
                                  |  4                              |
                                  v               5                 v
               backward_send_queues.add(tensor) +----> tensor = queue.remove()
                                                notify              |
                                                                    |
                                                                    | 6
                                                                    v
                                                                  _send
                                                                    |
                                                                    | 7
                                                                    |
                                                                    v
                                                                 dist.send
​
Copy the code

4.2 Accepting logic

The acceptance logic is as follows:

  1. Run_backward is called in the StageRuntime training code.
  2. Run_backward Calls receive_tensors_BACKWARD.
  3. Receive_tensors_backward callself.gradients[output_name] = self.comm_handler.recvGet the gradient. CommunicationHandler’s recv function blocks atbackward_receive_queues[tensor_name] [index]Above the law.
  4. Meanwhile, the recv_helper_thread thread of CommunicationHandler calls _recv to accept tensors from other stage points.
  5. Recv calls either dist. Recv or dist. Broadcast accepts the tensor.
  6. _recv adds a tensor to backward_receive_queues[tensor_name] [index]. This tells the recv function of the blocked CommunicationHandler to do its work.
  7. The recv function of CommunicationHandler extracts the gradient from backward_Receive_queues [tensor_name] [index] and returns it to StageRuntime. That’s the return of 3.

The details are as follows:

StageRuntime CommunicationHandler recv_helper_thread + + + | | | | 1 | | | | | 4 v | v run_backward | _recv | | | | | | | | | 5 | | | | 2 | v | | dist.recv / dist.broadcast | | | v 3 v | receive_tensors_backward +---------> recv | + | | | |  | | | | | | | | v | | backward_receive_queues.remove() | | | | | | | | | | | | | | wait on backward_receive_queues | | | | | | | | | | | | 6 v | backward_receive_queues <-------+ queue.add(tensor) | | notify | | 7 v 3 return | gradients[output_name] <---------------+Copy the code

4.3 recv

So what we’re doing here is we’re getting the corresponding tensors from the corresponding queue by their names.

def recv(self, tensor_name, forward_minibatch_id, backward_minibatch_id, backward=False): if backward: index = (backward_minibatch_id + self.rank_in_stage) % \ len(self.backward_receive_queues[tensor_name]) tensor = self.backward_receive_queues[tensor_name][ index].remove() return tensor else: # Forward time, Index = self.get_messaging_index(sending=False) tensor = self.forward_receive_queues[tensor_name][ index].remove() if tensor.dtype == torch.float32: tensor = tensor.requires_grad_() return tensorCopy the code

At runtime, the receive_tensorS_forward, receive_tensors_BACKWARD function will call the recv function to get the stored tensors from the corresponding queue. Such as:

def receive_tensors_backward(self): # Receive all required gradients from downstream # machines. for output_name in self.send_ranks: if output_name in self.target_tensor_names: Continue self.gradients[output_name] = \ self.self.m_handler. forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=True) self.backward_stats.stats['receive_tensors_size'] += \ (self.gradients[output_name].element_size() * self.gradients[output_name].nelement())Copy the code

4.4 the send

Here we’re putting tensors in the corresponding queue.

def send(self, tensor_name, tensor, forward_minibatch_id, backward_minibatch_id, backward=False): if backward: # Backward time, Index = self.get_messaging_index(sending=True) dST_rank = self.receive_ranks[tensor_name][index] self.backward_send_queues[tensor_name][index].add(tensor) else: index = (forward_minibatch_id + self.rank_in_stage) % \ len(self.send_ranks[tensor_name]) self.forward_send_queues[tensor_name][index].add(tensor)Copy the code

Used in send_tensors_backward, send_tensors_forward, for example:

    def send_tensors_backward(self):
        # Send all required gradients upstream.
        for input_name in self.receive_ranks:
            if input_name in self.target_tensor_names:
                continue
​
            self.comm_handler.send(
                input_name,
                self.gradients[input_name],
                forward_minibatch_id=self.forward_minibatch_id,
                backward_minibatch_id=self.backward_minibatch_id,
                backward=True)
​
            self.backward_stats.stats['send_tensors_size'] += \
                (self.gradients[input_name].element_size() *
                 self.gradients[input_name].nelement())
​
        if self.num_ranks_in_previous_stage > 0:
            # Used to track where to send tensors in the
            # backward pass.
            self.comm_handler.increment_messaging_index(
                sending=True)
Copy the code

4.5 _recv

In the _recv argument, sub_process_group is built in the code above.

Broadcast is used if it is on the same node, otherwise dist. Recv is used.

def _recv(tensor_name, src_rank, tensor_shape=None, dtype=torch.float32, tensor=None, tag=None, sub_process_group=None): """ Receives tensor by calling PyTorch's recv() call. Tensor will be copied to GPU prior to return. """ assert tag is not None if tensor is None: assert tensor_shape is not None assert dtype is not None assert dtype ! = torch.float16 if sub_process_group is not None: # Receive tensor shape. received_tensor_shape = torch.zeros(len(tensor_shape), dtype=torch.int) dist.broadcast(tensor=received_tensor_shape, src=src_rank, group=sub_process_group) received_tensor_shape = list(map(lambda x: int(x), received_tensor_shape)) # Receive tensor. tensor = torch.zeros(received_tensor_shape, dtype=dtype).cuda() dist.broadcast(tensor=tensor, src=src_rank, group=sub_process_group) else: # Receive tensor shape. received_tensor_shape = torch.zeros(len(tensor_shape), dtype=torch.int) dist.recv(tensor=received_tensor_shape, src=src_rank, tag=tag) received_tensor_shape = list(map(lambda x: int(x), received_tensor_shape)) # Receive tensor. tensor = torch.zeros(received_tensor_shape, dtype=dtype) dist.recv(tensor=tensor, src=src_rank, tag=tag) tensor = tensor.cuda() assert tensor.is_cuda return tensorCopy the code

_recv is called in recv_helper_thread.

def recv_helper_thread(queue, counter, local_rank, tensor_name, src_rank, tag, tensor_shape, dtype, sub_process_group, num_iterations): torch.cuda.set_device(local_rank) # This method is to be executed from a helper daemon thread. for i in range(num_iterations): tensor = _recv( tensor_name, src_rank, tensor_shape=tensor_shape, dtype=dtype, tag=tag, Sub_process_group =sub_process_group) queue. Add (tensor) # rement()Copy the code

4.6 _send

Broadcast is used if they are on the same node; dist. Send is used otherwise.

def _send(tensor, tensor_name, src_rank, dst_rank, tag, sub_process_group=None):
    """
    Sends tensor by calling PyTorch's send() call.
​
    If tensor is being sent not via broadcast(), it will
    be first copied to the CPU.
    """
    if sub_process_group is not None:
        assert tensor.is_cuda
​
        # Send tensor shape.
        tensor_shape = torch.tensor(tensor.shape, dtype=torch.int)
        dist.broadcast(tensor=tensor_shape, src=src_rank,
                      group=sub_process_group)
​
        # Send tensor.
        contiguous_tensor = tensor.detach().clone()
        dist.broadcast(tensor=contiguous_tensor.contiguous(),
                       src=src_rank,
                       group=sub_process_group)
    else:
        assert tensor.is_cuda
        tensor = tensor.cpu()
​
        # Send tensor shape.
        tensor_shape = torch.tensor(tensor.shape, dtype=torch.int)
        dist.send(tensor=tensor_shape, dst=dst_rank, tag=tag)
​
        # Send tensor.
        dist.send(tensor=tensor, dst=dst_rank, tag=tag)
Copy the code

Recv_helper_thread retrieves the tensor using _send.

def send_helper_thread(queue, counter, local_rank, tensor_name, src_rank, dst_rank, tag, sub_process_group, num_iterations): torch.cuda.set_device(local_rank) # This method is to be executed from a helper daemon thread. for i in Range (num_iterations): tensor = queue.remove() # Fetch tensor from queue and send it out. _send(tensor, tensor_name, src_rank, dst_rank, tag=tag, sub_process_group=sub_process_group) counter.decrement()Copy the code

At this point, the communication module has been analyzed, the next article will finally introduce 1F1B.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference