0 x00 the

In previous articles we covered the basics of PyTorch pipeline-parallelism and automatic balancing, and in this article we show you how to shred data and runtime systems.

Pipelining parallelism other articles are linked below:

Deep learning pipeline parallel Gpipe(1)– pipeline basic implementation

Deep learning pipeline parallel GPipe (2) —– gradient accumulation

Deep learning pipeline parallel GPipe(3) —- recalculation

Deep learning pipeline parallel PipeDream(1)– Profile stage

Deep learning pipeline parallel PipeDream(2)– computing partitions

Deep learning pipeline parallel PipeDream(3)– transformation model

Deep learning pipeline parallel PipeDream(4)– runtime engine

Deep learning pipeline parallel PipeDream(5)– communication module

Deep learning pipeline parallel PipeDream(6)– 1F1B strategy

PyTorch Pipeline parallel implementation (1)– Basics

PyTorch Pipeline parallel implementation (2)– How to divide the model

Finally, the runtime system is as follows:

0x01 Split small batches

Let’s first look at splitting a Mini-batch into multiple Micro-batches.

1.1 the use of

As you can see from the sample code below, the scatter method is used for segmentation.

# Divide a mini-batch into micro-batches.
batches = microbatch.scatter(input, self.chunks)
​
# Run pipeline parallelism.
pipeline = Pipeline(batches,
                            self.partitions,
                            self.devices,
                            copy_streams,
                            self._skip_layout,
                            checkpoint_stop)
​
pipeline.run()
​
# Merge the micro-batches into one mini-batch.
output = microbatch.gather(batches)
return output
Copy the code

1.2 PyTorch base

Let’s start with some basic code for PyTorch.

1.2.1 the chunk

The chunk method can block tensors and return a list of tensors with the following parameters:

  • Ensor: tensor to be partitioned.
  • “Chunks” : the number of chunks to be divided
  • Dim: Blocks along which axis

Specific examples are as follows:

import numpy as np import torch data = torch.from_numpy(np.random.rand(3, 5)) print(str(data)) for i, data_i in enumerate(data.chunk(3, 0)): Print (STR (data_i)) output tensor([[0.1208, 0.3428, 0.4586, 0.9372, 0.6410], [0.7889, 0.4480, 0.7607, 0.7903, [0.8391, 0.6649, 0.8338, 0.3477, 0.3953]] dType = Torch. Float64) tensor [[0.1208, 0.3428, 0.4586, 0.9372, Dtype = Torch. Float64) tensor([[0.7889, 0.4480, 0.7607, 0.7903, 0.4118]] dType = Torch. Float64) tensor([[0.7889, 0.4480, 0.7903, 0.4118]] ], dType =torch. Float64)Copy the code

1.2.2 the cat

Cat works by concatenating tensors together, or concatenating a list of tensors.

Z = torch. Cat ((X,Y),0) (X,Y),0Copy the code

Let’s use an example:

X = torch.ones(2, 5)
Y = torch.ones(4, 5)
Z = torch.cat((X, Y), 0)
print(Z)
Copy the code

The result is:

tensor([[1., 1., 1., 1., 1.], [1., 1., 1., 1., 1.], [1., 1., 1., 1., 1.], [1., 1., 1., 1., 1.], [1., 1., 1., 1., 1.], [1., 1., 1., 1.]Copy the code

1.3 Segmentation & aggregation

Back to the Scatter batch, let’s look at the Scatter code.

def scatter(input: TensorOrTensors, chunks: int) -> List[Batch]: """Splits an input mini-batch into multiple micro-batches.""" inputs: Iterable[TensorOrTensors] if isinstance(input, Tensor): inputs = input. List[Tensors] = [] for tensor in input: # if it is a tensor array, Tensors = tensor. Chunk (chunks) # The value of each tensor was divided. Append (Cast (Tensors, Tensors) # the estimation results were all of the Tuple list inputs = zip(* Rotated) # Batch List returnsCopy the code

The Gather method is a reverse scatter method.

def gather(outputs: List[Batch]) -> TensorOrTensors:
    """Concatenates output micro-batches into a mini-batch."""
    output: TensorOrTensors
​
    if outputs[0].atomic:
        tensors = tuple(b.tensor for b in outputs)
        output = torch.cat(tensors)
    else:
        rotated = [b.tensors for b in outputs]
        output_buf = []
​
        for tensors in zip(*rotated):
            output_buf.append(torch.cat(tensors))
​
        output = tuple(output_buf)
​
    return output
Copy the code

1.4 analyze

So let’s see how it works, the following code is to break up the list of tensors ab into two pieces.

def test_scatter_tuple():
    ab = (torch.ones(2, 1), torch.zeros(4, 2), torch.zeros(6, 3))
​
    a, b = scatter(ab, chunks=2)
​
    assert a.tensors[0].size() == (1, 1)
    assert b.tensors[0].size() == (1, 1)
    assert a.tensors[1].size() == (2, 2)
    assert b.tensors[1].size() == (2, 2)
    assert a.tensors[2].size() == (3, 3)
    assert b.tensors[2].size() == (3, 3)
Copy the code

So let’s draw a picture.

+-------------------------------------------------------------+ | ab | | | | +-----------+ +---------+ +----------+ | | | | | | | 0 0 0 | | | | | | 0 0 | | 0 0 0 | | | | 1 | | 0 0 | | 0 0 0 | | | | 1 | | 0 0 | | 0 0 0 | | | | | | 0 0 | | 0 0 0 | | | | | | | | 0 0 0 | | | + -- -- -- -- -- -- -- -- -- -- - + + -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- + | | | +-------------------------------+-----------------------------+ | | | a, b = scatter(ab, chunks=2) | | | | | v +------------------------------+ +-----------------------------+ | a | |b | | +---+ +-----+ + -- -- -- -- -- -- -- -- + | | + -- -- -- + + + + -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | 1 | | 0 0 | | 0 0 0 | | | | 1 | | 0 0 | | 0 0 0 | | | + -- -- -- + | 0 0 | | 0 0 0 | | | + -- -- -- + | 0 0 | | 0 0 0 | | | + -- -- -- -- -- + | 0 0 0 | | | + -- -- -- -- -- + | 0 0 0 | | | + -- -- -- -- -- -- -- -- + | | + -- -- -- -- -- -- -- -- + | +------------------------------+ +-----------------------------+Copy the code

You can also see how to aggregate using the sample code below.

def test_gather_tensors():
    a = torch.zeros(1, 1)
    b = torch.zeros(1, 1)
    ab = gather([Batch(a), Batch(b)])
​
    assert ab.size() == (2, 1)
​
​
def test_gather_tuples():
    a = (torch.zeros(1, 1), torch.zeros(2, 2))
    b = (torch.zeros(1, 1), torch.zeros(2, 2))
    ab = gather([Batch(a), Batch(b)])
​
    assert isinstance(ab, tuple)
    assert ab[0].size() == (2, 1)
    assert ab[1].size() == (4, 2)
Copy the code

0 x02 run

Let’s look at some of the run-time infrastructure, including streams, Tasks, and workers.

2.1 the Stream

The Stream class is used to encapsulate CUDA streams and CPU streams. The code is located at: torchgpipe/stream.py.

CUDA stream represents a GPU operation queue, that is, a kernel sequence bound to a device in order. We can think of a stream as a task on a GPU. The user adds a series of operations to the stream queue, and the GPU performs these operations in the order they were added to the stream. In the same flow, all operations are serialized, so they are never parallel. Therefore, to be parallel, the two operations must be in different streams. The kernel functions in different flows can be interlaced and may even overlap.

class CPUStreamType:
    pass
​
# The placeholder on place of streams for the CPU device instead of CUDA.
CPUStream = CPUStreamType()
​
# It represents both CUDA streams and the CPU stream.
AbstractStream = Union[torch.cuda.Stream, CPUStreamType]
Copy the code

The role of torch.cuda.stream(stream) is to select the context manager for a given stream.

The related operation used in this article is use_stream.

@contextmanager
def use_stream(stream: AbstractStream) -> Generator[None, None, None]:
    """:func:`torch.cuda.stream` for either CPU or CUDA stream."""
    if not is_cuda(stream):
        yield
        return

    with torch.cuda.stream(as_cuda(stream)):
        yield
        
def is_cuda(stream: AbstractStream) -> bool:
    """Returns ``True`` if the given stream is a valid CUDA stream."""
    return stream is not CPUStream


def as_cuda(stream: AbstractStream) -> torch.cuda.Stream:
    """Casts the given stream as :class:`torch.cuda.Stream`."""
    return cast(torch.cuda.Stream, stream)        
Copy the code

2.2 the Task

Task represents how to calculate micro-batch data on a partition. It consists of two parts:

  • computeIt should be executed concurrently in a worker thread.
  • finalizeShould be executed after the worker thread completes.

It can be understood as a business processing logic. For those of you who have experience with Android, it can be interpreted as similar to business Message. In fact, Android Message is also called task, which encapsulates the information carried by the task and the handler that processes the task.

When constructing a Task, compute and Finalize methods are passed in as follows:

task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute)
Copy the code

Or as follows:

def compute(batch: Batch = batch,
            partition: nn.Sequential = partition,
            skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
            ) -> Batch:
    with use_skip_tracker(skip_tracker):
        return batch.call(partition)

task = Task(streams[j], compute=compute, finalize=None)
Copy the code

Tasks are defined as follows: Tasks are tied to a Stream, which means they can run on any device.

class Task: """A task represents how to compute a micro-batch on a partition. It consists of two parts: :meth:`compute` and :meth:`finalize`. :meth:`compute` should be executed in worker threads concurrently. :meth:`finalize` should be executed after when worker threads complete to execute :meth:`compute`. :meth:`compute` might  be boosted by worker threads. Because it produces several CUDA API calls by user code. In PyTorch, parallel CUDA API calls are not serialized through GIL. So more than one CUDA API call can be produced at the same time.  """ def __init__(self, stream: AbstractStream, *, compute: Callable[[], Batch], finalize: Optional[Callable[[Batch], None]], ) -> None: self.stream = stream self._compute = compute self._finalize = finalize def compute(self) -> Batch: Return self._compute() # call the passed business code def finalize(self, batch:) Batch) -> None: if self._finalize is None: return with use_stream(self.stream): # bind on stream self._finalize(Batch) # call the incoming business codeCopy the code

2.3 the Worker

Worker is used to run tasks, and each device has a worker responsible for executing tasks on this device. Looper if you have experience with Android.

It is important to note that worker is just a function. If it runs, it needs a thread as a anchor. This is the follow-up work for spawn_workers.

def worker(in_queue: InQueue, out_queue: OutQueue, device: torch.device, grad_mode: bool, ) -> None: """The main loop of a worker thread.""" torch.set_grad_enabled(grad_mode) with use_device(device): while True: Task = in_queue.get() # Obtain task from input queue if task is None: break try: batch = task.compute() # calculate task except Exception: exc_info = cast(ExcInfo, sys.exc_info()) out_queue.put((False, exc_info)) continue out_queue.put((True, (task, Batch)) # batch = (False, None) out_queue.put(done)Copy the code

2.4 to generate the worker

The @contextManager annotation is used. This object implements the context management protocol. It is used to save and restore various global states, close files, etc. except… Finally provides an easy-to-use package.

Spawn_workers Spawn_worker Spawn_worker Spawn_worker spawn_worker Spawn_worker Spawn_worker spawn_worker spawn_worker spawn_worker spawn_worker

Spawn_workers not only generates workers, but also a pair of message queues (in_queues, out_queues), which exist throughout the Pipeline life cycle. Specifically:

  • Spawn_workers Generates an in internally for each devicequeue, outThe queue. So it is guaranteed that there is a serial on top of each device to perform business operations.
in_queue, out_queue = workers[device]
Copy the code
  • These queues have been added to (in_queues, out_queues).
in_queues.append(in_queue)
out_queues.append(out_queue)
Copy the code
  • Next, use (in_queues, out_queues) as contexts for communicating information between tasks.

  • The order of in_queues is the order of the devices, also known as partitions. Out_queues versa.

The specific code is as follows:

@contextmanager def spawn_workers(devices: List[torch.device], ) -> Generator[Tuple[List[InQueue], List[OutQueue]], None, None]: """Spawns worker threads. A worker thread is bound to a device.""" in_queues: List[InQueue] = [] out_queues: List[OutQueue] = [] # Spawn workers. workers: Dict[torch.device, Tuple[InQueue, OutQueue]] = {} def normalize_device(device: torch.device) -> torch.device: if device.type == 'cuda' and device.index is None: return torch.device('cuda', index=torch.cuda.current_device()) if device.type == 'cpu' and device.index is not None: Return torch. Device (' CPU ') return device for device in devices: device = normalize_device(device) In_queue, out_queue = workers[device] If the device has not already generated the corresponding queues, Queue workers[device] = (in_queue, workers[device] = (in_queue, workers]) Args =(in_queue, out_queue, device); torch.is_grad_enabled()), daemon=True, Start () # queues. Append (in_queue) # queues. Append (out_queue) # queues. Finally: # Close workers. For in_queues in set(in_queues): in_queue.put(None) # Join running workers. running = set(out_queues) while running: out_queue = running.pop() ok, payload = out_queue.get() done = (False, None) if (ok, payload) == done: continue running.add(out_queue)Copy the code

2.5 the use of

2.5.1 When to generate the worker

The example is in torchgpipe/pipeline.py, where workers are generated in the run function of the Pipeline class. As we can see, the queues that make sense for pipelines are (in_queues, out_queues).

def run(self) -> None: """Runs pipeline parallelism. It modifies the given batches in place. """ batches = self.batches partitions = self.partitions devices = self.devices skip_layout = self.skip_layout m = len(batches) n = len(partitions) skip_trackers  = [SkipTrackerThroughPotals(skip_layout) for _ in batches] with spawn_workers(devices) as (in_queues, out_queues): Create a queue for schedule in clock_cycles(m, n): # this is according to the algorithm in order to run multiple fence, compute the self. The fence (schedule, skip_trackers) # the queue passed in self.com pute (schedule, skip_trackers. in_queues, out_queues)Copy the code

2.5.2 analysis

Torchgpipe uses Python’s Queue data structure.

The Queue class implements a basic first-in, first-out (FIFO) container.

A multi-producer, multi-consumer queue.
Copy the code

The main methods are:

  • Queue.get([block, [timeout]]) reads the Queue and removes the element from the end of the Queue.
  • Queue.put(item, [block, [timeout]]) Writes to the Queue, adding the element to the end of the sequence.

I’m more used to thinking of (in_queues, out_queues) as Linux-like pipes.

Linux pipe is a basic IPC mechanism that functions between related processes to complete data transfer. Its specific features are as follows:

  • A pipe is a FIFO file managed by a kernel. It is actually a buffer, which is like a pipe we put in memory. Two processes are on opposite sides of the pipe, passing information through the pipe.
  • One end of the pipe is connected to the output of a process. This process puts information into the pipe. When the pipe is full of information, the process trying to put the information in will wait until the process on the other end takes it out.
  • The other end of the pipe connects to the input of another process, which fetches the information put into the pipe. When there is no information in the pipe, the process reading from the pipe waits until the process on the other end puts the information in.

To return to TorchPipe, let’s take a look at the content of the paper in advance:

For this fine-grained order control, Torchgpipe implements checkpointing using two separate autograd functions Checkpoint and Recompute. During the execution time of the task, a pair of Checkpoint and Recompute with shared memory is generated. The shared memory is used in backward propagation to transfer the local compute graph generated by Recompute to Checkpoint for backward propagation.

As a result, there are many requirements for parallel processing, so we can see the result of putting tasks into in_queues and removing tasks from out_queues in the Compute method of the Pipeline class.

def compute(self, schedule: List[Tuple[int, int]], skip_trackers: List[SkipTrackerThroughPotals], in_queues: List[InQueue], out_queues: List[OutQueue], ) -> None: # With checkpointing, the autograd graph looks like this diagram: # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Copy │ # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ (fence) # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┃ (compute) # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [1] the Synchronize the current stream with the copy stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Checkpoint │ [2] Compute a partition within checkpointing. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [3] the Synchronize the copy stream With the current stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┐ # ┃ ┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ┐ # ┃ │ Recompute │ [4] the Schedule the recomputation The at backpropagation. # ┃ └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┘ # # ┃ ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ (fence) # │ Copy │ # ├ ──┰─── our r company for I, J in schedule: [1] In the diagram # Batch = batch [I] partition = partitions[J] # Synchronize with the copied input Determine whether checkpointing or not. if checkpoint: def function(input: TensorOrTensors, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> TensorOrTensors: with use_skip_tracker(skip_tracker): = = = = = = = = = = = = = = = = = = finalize=chk.recompute) del function, chk else: def compute(batch: Batch = batch, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> Batch: with use_skip_tracker(skip_tracker): Task = Task(streams[j], compute=compute, finalize=None) del compute # Compute tasks in parallel. ([2] in the diagram) in_queues[j].put(task) # Insert a new task into the JTH partition. Since I and j are already set in the clock algorithm, forward propagation follows this path. For I, j in schedule: ok, payload = out_queues[j] # omit subsequent codeCopy the code

2.6 summarize

Let’s summarize the general business logic (more on that later) :

  1. The system call spawn_workers generates several workers.
  2. Spawn_workers Spawn_worker Spawn_worker Spawn_worker spawn_worker Spawn_worker Spawn_worker spawn_worker spawn_worker spawn_worker spawn_worker Spawn_workers also generates an in_queue, out_queue for each device. So it is guaranteed that there is a serial on top of each device to perform business operations.
  3. These queues have been added to (in_queues, out_queues). The (in_queues, out_queues) queues are then returned to the Pipeline main thread. Next, use (in_queues, out_queues) as contexts for communicating information between tasks.
  4. After the main queue (in_queues, out_queues) has been achieved, if a Task needs to run via compute, insert the Task into the in_queue corresponding to its device.
  5. The Worker Thread blocks on the in_queue, and if it finds something, it reads the Task and runs the Task.
  6. The Worker Thread inserts the result to the out_queue.
  7. Pipeline’s compute method fetches the run results from the out_queue for subsequent processing.

As shown below:

+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | 1 | | +--------------------------------------------------------------+ | | | 3 (in_queues, out_queues) | | | v | v +--------------------------------+---------+ +------+----+-----------------------------------------------------------------------+ | Pipeline | | | spawn_workers | |  | | | | | | | | +-------------------------------------+ | | | | | | workers | | | | | | | | t = Thread( | | + | | | | target=worker, | | spawn_workers(devices) | | | device 1 : in_queue 1, out_queue 1 | args=(in_queue, out_queue, device), | | | | | | daemon=True, | | | | | device 2 : in_queue 2, out_queue 2 | ) | | +--------------------------------------+ | | | | t.start() | | | compute | | | | device 3 : in_queue 3, out_queue 3 | + | | | | | | | | | | | | | | 4 | | | | | | | in_queues[j].put(task) +-----------------------+ | +-------------------------------------+ | | | | | | | +-----------------------------------------------------------------------------------+ | | | | | | 2 | | ok, payload = out_queues[j].get()<--------+ | +---------------------+ | | | | | | | | in_queues | v | +--------------------------------------+ | | | | | | | | +------------> in_queue 1 +--------+ +---------------------------------------------------------------------+ +------------------------------------------+ | |  in_queue 2 | | | Thread | | | in_queue 3 | | | | | | | | 5 | +------------------------------------------------------------+ | | 7 +---------------------+ | | | Worker | | | +---------------------+ | | | | | | | out_queues | | | | device 1 task = in_queue.get() | | | | | | task | | | | +------------------+ out_queue 1 <--+ | +----------------------> in_queue 1 batch = task.compute() | | (True, (task,,batch)) | out_queue 2 | | | | | | | out_queue 3 +---------------------------+ out_queue 1 out_queue.put((True, (task, batch))) | | | | 6 | | | | +---------------------+ | +------------------------------------------------------------+ | +---------------------------------------------------------------------+Copy the code

The mobile phone is as follows:

So far, we’ve looked at how to shred data and some runtime mechanisms, and in the next article we’ll look at the implementation in combination with the paper.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Markdown formula

Formula editing tutorial in Markdown

Docs.nvidia.com/cuda/cuda-r…

CUDA Learning: Basics Summary

Use of CUDA Stream

NVIDIA solution architect in-depth parsing of large-scale parametric language model Megatron-Bert

Accelerating Wide & Deep Recommender Inference on GPUs

HugeCTR: High-Performance Click-Through Rate Estimation Training

Discuss.pytorch.org/t/how-to-pr…

Github.com/NVIDIA/apex…

Github.com/justheurist…

Pytorch.org/tutorials/i…

Pytorch.org/docs/stable…

Pytorch.org/docs/notes/…

zhuanlan.zhihu.com/p/61765561

Pytorch.apachen.org/docs/1.7/64…

Zhidx.com/p/217999.ht…