0 x00 the

The previous articles covered the basics of PyTorch pipelineparallelism, automatic balancing, and data segmentation. In this article, we will take a look at how to implement parallelism.

Pipelining parallelism other articles are linked below:

Deep learning pipeline parallel Gpipe(1)– pipeline basic implementation

Deep learning pipeline parallel GPipe (2) —– gradient accumulation

Deep learning pipeline parallel GPipe(3) —- recalculation

Deep learning pipeline parallel PipeDream(1)– Profile stage

Deep learning pipeline parallel PipeDream(2)– computing partitions

Deep learning pipeline parallel PipeDream(3)– transformation model

Deep learning pipeline parallel PipeDream(4)– runtime engine

Deep learning pipeline parallel PipeDream(5)– communication module

Deep learning pipeline parallel PipeDream(6)– 1F1B strategy

PyTorch Pipeline parallel implementation (1)– Basics

PyTorch Pipeline parallel implementation (2)– How to divide the model

PyTorch pipeline parallel implementation (3)- shards data and runtime systems

PyTorch pipeline parallel implementation (4)- forward calculation

PyTorch Pipeline parallel implementation (5)- calculate dependencies

The images are from the paper and github source code.

0x01 Overall Architecture

Let’s start with a look at Torchgpipe as a whole.

1.1 the use of

We use test examples from the source code for analysis. The example has a Sequential model consisting of three layers that, when encapsulated by GPipe, propagate forward and backward.

class Layer1(nn.Module): def __init__(self): super().__init__() self.conv = nn.Conv2d(3, 3, 1) def forward(self, input): yield stash('1to3', input) output = self.conv(input) return output class Layer2(nn.Module): def __init__(self): super().__init__() self.conv = nn.Conv2d(3, 3, 1) def forward(self, input): output = self.conv(input) return output class Layer3(nn.Module): def __init__(self): super().__init__() self.conv = nn.Conv2d(3, 3, 1) def forward(self, input): skip_1to3 = yield pop('1to3') output = self.conv(input) + skip_1to3 return output model = nn.Sequential(Layer1(), Layer2(), Layer3()) # build a Sequential model = GPipe(model, balance, chunks=3, GPipe in_device = model. Devices [0] out_device = model. Devices [-1] input = Sequential torch.rand(30, 3, 224, 224, device=in_device, Requires_grad =True) output = model(input) # here will call gpipe. forward loss = output.mean() loss.backward() # here will go back propagationCopy the code

1.2 Forward propagation

The forward propagation of GPipe does the following:

  • To split the input batches using The Scatter function is to split the Mini-Batch into micro-batches.
  • Generate a new CUDA stream for each device using the _enSURE_copy_streams method.
  • Build a Pipeline and run it.
  • After batches are finished, the Gather method is used to combine micro-batches into a mini-batch.

So we can see that for each iteration of the forward operation, a Pipeline class is generated to perform the operation and returned to the caller.

def forward(self, input: TensorOrTensors) -> TensorOrTensors: # type: ignore """:class:`GPipe` is a fairly transparent module wrapper. It doesn't modify the input and output signature of the  underlying module. But there's type restriction. Input and output have to be a :class:`~torch.Tensor` or a tuple of tensors. This restriction is applied at partition boundaries too. Args: input (torch.Tensor or tensors): input mini-batch Returns: tensor or tensors: output mini-batch Raises: TypeError: input is not a tensor or tensors. """ microbatch.check(input) if not self.devices: # Empty sequential module is not illegal. return input # Divide a mini-batch into micro-batches. batches = microbatch.scatter(input, self.chunks) # Separate CUDA streams for copy. copy_streams = self._ensure_copy_streams() # The micro-batch index where the checkpointing stops. if self.training: checkpoint_stop = { 'always': self.chunks, 'except_last': self.chunks-1, 'never': 0, }[self.checkpoint] else: checkpoint_stop = 0 # Run pipeline parallelism. pipeline = Pipeline(batches, self.partitions, self.devices, copy_streams, self._skip_layout, checkpoint_stop) pipeline.run() # Merge the micro-batches into one mini-batch. output = microbatch.gather(batches) return outputCopy the code

The _ensure_copy_streams method simply generates a new CUDA stream for each device.

def _ensure_copy_streams(self) -> List[List[AbstractStream]]:
    """Ensures that :class:`GPipe` caches CUDA streams for copy.
​
    It's worth to cache CUDA streams although PyTorch already manages a
    pool of pre-allocated CUDA streams, because it may reduce GPU memory
    fragementation when the number of micro-batches is small.
​
    """
    if not self._copy_streams:
        for device in self.devices:
            self._copy_streams.append([new_stream(device) for _ in range(self.chunks)])
​
    return self._copy_streams
Copy the code

1.3 Pipeline class

The run method of the Pipeline class starts the calculation according to the clock cycle, so that in the forward propagation, the sequence spreads like water ripples.

def run(self) -> None: """Runs pipeline parallelism. It modifies the given batches in place. """ batches = self.batches partitions = self.partitions devices = self.devices skip_layout = self.skip_layout m = len(batches) n = len(partitions) skip_trackers  = [SkipTrackerThroughPotals(skip_layout) for _ in batches] with spawn_workers(devices) as (in_queues, out_queues): for schedule in clock_cycles(m, n): In this case, it gives you a sequence plan to execute, and then executes it in accordance with it. Self.com pute(Schedule, Skip_trackers, in_queues, out_queues) #Copy the code

1.3.1 Building dependencies

In Pipeline, the fence method (omitting part of the code) leverages Depend to build backpropagation dependencies.

def fence(self, schedule: List[Tuple[int, int]], skip_trackers: List[SkipTrackerThroughPotals], ) -> None: """Copies micro-batches after computation for the previous micro-batches. """ batches = self.batches copy_streams = self.copy_streams skip_layout = self.skip_layout for i, j in schedule: # Ensure that batches[i-1] is executed after batches[i] in # backpropagation by an explicit dependency. if i ! = 0: [J][I] for [I][I][I] name in skip_layout.copy_policy(j): prev_stream = copy_streams[prev_j][i] skip_trackers[i].copy(batches[i], prev_stream, next_stream, ns, name) if j ! [I][I] # copy a copy of an existing name of a 'batches' label (' prev_stream', 'next_stream')Copy the code

1.3.2 Queue

The Worker interacts with the main thread using Python’s Queue data structure. The Queue class implements a basic first-in, first-out (FIFO) container, using put() to add elements to the end of the sequence and get() to remove elements from the end of the Queue.

A multi-producer, multi-consumer queue.
Copy the code
  • Get ([block, [timeout]]) Reads the queue. Timeout is the waiting time. If the queue is full, the queue is blocked.
  • Put (item, [block, [timeout]]) Writes to the queue. Timeout is the waiting time. If the queue is empty, the queue is blocked.

The two key functions are:

1.3.3 calculation

The training is done by compute.

def compute(self, schedule: List[Tuple[int, int]], skip_trackers: List[SkipTrackerThroughPotals], in_queues: List[InQueue], out_queues: List[OutQueue], ) -> None: """Runs tasks with synchronization to copy streams.""" batches = self.batches partitions = self.partitions devices = self.devices copy_streams = self.copy_streams checkpoint_stop = self.checkpoint_stop n = len(partitions) streams = [current_stream(d) for d in devices] exc_info: Optional[ExcInfo] = None # With checkpointing, the autograd graph looks like this diagram: # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Copy │ # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ (fence) # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┃ (compute) # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [1] the Synchronize the current stream with the copy stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Checkpoint │ [2] Compute a partition within checkpointing. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [3] the Synchronize the copy stream With the current stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┐ # ┃ ┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ┐ # ┃ │ Recompute │ [4] the Schedule the recomputation The at backpropagation. # ┃ └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┘ # # ┃ ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ (fence) # │ Copy │ # ├ ──┰─── our r company for I, J in schedule: batch = batches[i] partition = partitions[j] # Synchronize with the copied input. ([1] in the diagram) if j ! = 0: Wait (batch, copy_streams[j][I], streams[j]) # Determine whether checkpointing or not. checkpoint = (i < checkpoint_stop) if checkpoint: def function(input: TensorOrTensors, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> TensorOrTensors: with use_skip_tracker(skip_tracker): return partition(input) chk = Checkpointing(function, batch) task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute) del function, chk else: def compute(batch: Batch = batch, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> Batch: with use_skip_tracker(skip_tracker): return batch.call(partition) task = Task(streams[j], compute=compute, ([2] in the diagram) in_queue [J]. Put (task) # [j]. Get () # Hold the first exception. If exc_info is not used: continue elif not ok: exc_info = cast(ExcInfo, payload) continue task, batch = cast(Tuple[Task, Batch], payload) # The copy stream synchronizes to copy the output. ([3] in the # diagram) if j ! = n-1: Wait (batch, streams[j], copy_streams[j][I]) # Finalize tasks. here the # recomputation is scheduled at backpropagation. ([4] in the # diagram) with use_device(devices[j]): task.finalize(batch) batches[i] = batch # Fail at the first exception. if exc_info is not None: raise exc_info[0].with_traceback(exc_info[1], exc_info[2])Copy the code

Let’s summarize the general business logic (parallel logic) :

  1. The system call spawn_workers generates several workers.

    1. Spawn_workers Spawn_worker Spawn_worker Spawn_worker spawn_worker Spawn_worker Spawn_worker spawn_worker spawn_worker spawn_worker spawn_worker Spawn_workers also generates an in_queue, out_queue for each device. So it is guaranteed that there is a serial on top of each device to perform business operations.
    2. These queues have been added to (in_queues, out_queues). The (in_queues, out_queues) queues are then returned to the Pipeline main thread. Next, use (in_queues, out_queues) as contexts for communicating information between tasks.
  2. After the Pipeline main thread has been (in_queues, out_queues), the clock_Cycles algorithm is used to generate a series of iterations, each of which is a schedule.

  3. For each iteration (schedule), fence is used to copy the stream & setting dependencies, and compute is used for training. This starts multiple computes in sequence.

  4. In each compute, run a Task for (I, j), find the in_queue for its device, insert the Task.

  5. The Worker Thread blocks on the in_queue, and if it finds something, it reads the Task and runs. Multiple computes are executed sequentially, but since compute is just an insert queue operation, it can be returned immediately. Multiple worker threads block on the queue, which can then be trained in parallel.

  6. The Worker Thread inserts the result to the out_queue.

  7. The compute method fetches the results from the out_queue for subsequent processing.

The details are shown below.

+-------------------------------------------------------------------+ +-----------------------------------------+ | Pipeline | 1 | spawn_workers | | spawn_workers(devices) +-----------> | | | | | +-------------------------------------+ | | for schedule in clock_cycles(m, n) | | | workers | | | + | | | | | | | 2 | | | | | | | | | | device 1 : in_queue 1, out_queue 1 | | | +-----------+---------------+ | | | | | | | | | | | | device 2 : in_queue 2, out_queue 2 | | | v v v | | | | | | +------------------+------+ +-----------+--------------+ | | | device 3 : in_queue 3, out_queue 3 | | | | compute | | compute | | | | | | | | | 3 | | | | | | | | | in_queues[j].put(task) | | in_queues[j].put(task) | | | +-------------------------------------+ | | | | ...... | | | | | | | out_queues[j].get() | | out_queues[j].get() | | +-----------------------------------------+ | | | | | | | + -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + - + - + | | | ^ ^ | | | | | | | | +-------------------------------------------------------------------+ 7 | | 4 7 | | 4 | | | | v | | v +-----+---+------------------------------------+----+-----+ | in_queues out_queues | +------------> | | < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- + | | | 6 6 | | | | 5 5 | | | | | | |  | | | | +-------------------------------------+ +-------------------------------------+ | | | Thread 1 | device 1 | | Thread 2 | device 3 | | | | | | | | | | | | +---------------------------------+ | | +---------------------------------+ | | | | | Worker | | | | | Worker | | | | | | | v | | | | v | | | | | | task = in_queue.get() | | | | task = in_queue.get() | | | | | | | | ...... | | | | | | | | batch = task.compute() | | | | batch = task.compute() | | | | | | | | | | | | | +--------+out_queue.put((task, batch))) | | | | out_queue.put((task, batch))+--------->+ | | | | | | | | | +---------------------------------+ | | +---------------------------------+ | +-------------------------------------+ +-------------------------------------+Copy the code

The mobile phone is as follows:

0x02 Parallel Copy and Calculation

Let’s move on to Concurrent Copy and Computation: Streams.

2.1 GPU Parallel Operations

Let’s start by looking at the parallel operation capabilities offered by the GPU.

CUDA stream represents a GPU operation queue, that is, a kernel sequence bound to a device in order. We can think of a stream as a task on a GPU. The user adds a series of operations to the stream queue, and the GPU performs these operations in the order they were added to the stream. In the same flow, all operations are serialized, so they are never parallel. Therefore, to be parallel, the two operations must be in different streams. The kernel functions in different flows can be interlaced and may even overlap.

Almost all CUDA devices with computing capacity 1.1 or higher support concurrent replication and execution, namely Device Overlap, which has the following characteristics:

  1. Data copying and numerical computation can be done in parallel.
  2. Both directions can be copied in parallel (GPU to CPU, CPU to GPU).
  3. The kernel that performs numerical calculations cannot read or write the data being copied.

Generally speaking, the CPU memory is larger than the GPU memory. Therefore, it is impossible to copy the CPU memory to the GPU at a time. Therefore, device overlap can greatly improve the execution efficiency of GPU programs, such as:

  1. The data is broken up into chunks, and each piece is handed over to a Stream for processing.

  2. Each Stream does the following:

    1. Copy data belonging to the Stream from host memory into device memory,
    2. GPU performs kernel operation and saves the calculation result to GPU memory.
    3. Copy the Stream calculation from device memory back to host memory.
  3. The GPU’s scheduler determines how streams are parallel.

  4. CPU operations can also be performed in parallel.

2.2 PyTorch

Unless otherwise specified, PyTorch publishes each device-bound kernel to the default stream. Because forward propagation is in the default stream, in order to process “next Batch prefetch (copy CPU to GPU)” and “current Batch forward propagation” in parallel, it must do:

  • Batch data on the CPU must be pinned. Locking pages allows hardware to access CPU memory directly, thus reducing some replication operations. Locked pages cannot be swapped onto hard disk. The memory allocated on the GPU is locked page memory by default.
  • The prefetch operation must be performed on another stream.

Torchgpipe registers each copy core into a non-default stream while leaving the computational kernel in the default stream. This allows device J to process while also sending to and/or receiving from the device.

In addition, each device uses a different stream for each microlot. Since there is no real dependency between the different microbatches, this use of the stream is safe, allowing for copying as quickly as possible. See the figure below.

The figure represents the timeline of device J, whether the non-default stream is used for replication

  • Part (a) means: Using only the default stream, the replication core may block the calculation core (and vice versa) until replication is complete.
  • Part (b) means: with replication streams, calculations can be performed simultaneously with sending or receiving data from other devices.

2.3 the Stream encapsulation

Because it operates on a stream, Torchgpipe encapsulates some of the underlying stream operations. The main stream-related code is at torchgpipe/stream.py.

2.3.1 PyTorch sample

Because Torchgpipe uses wait_stream and record_stream, and because there is less information online, digging into CUDA or PyTorch can take too long, So we pass the torch/nn/parallel/distributed. Py the code to see how to use, you can see.

  • Wait_stream acts as a wait: one stream waits for another to complete.
  • Record_stream ensures that tensor memory operations are not reused until they are complete. In combination with other data, we can understand this as ensuring that the operations recorded on a stream are complete before proceeding to the next step.

The specific code is as follows:

# Perform CPU -> GPU copies in a background stream. This code is # motivated from similar logic in torch/nn/parallel/_functions.py stream = _get_stream(target_gpu) with torch.cuda.stream(stream): Output = obj.to(target_gpu) # synchronize with the copy stream with torch. Cuda. device(target_gpu): current_stream = torch.cuda.current_stream() # Sync the current stream with the copy stream Current_stream.wait_stream (stream) # waiting # Ensure tensor memory is not resused until work on main stream is complete Output. Record_stream (current_stream) # Ensure that return (output,)Copy the code

2.3.2 Generate/Obtain

The functions for generating and retrieving are:

  • New_stream generates a new stream.
  • Current_stream returns the current stream.
  • Default_stream returns the default stream.
def new_stream(device: torch.device) -> AbstractStream: """Creates a new stream for either CPU or CUDA device.""" if device.type ! = 'cuda': return CPUStream return torch.cuda.Stream(device) def current_stream(device: torch.device) -> AbstractStream: """:func:`torch.cuda.current_stream` for either CPU or CUDA device.""" if device.type ! = 'cuda': return CPUStream return torch.cuda.current_stream(device) def default_stream(device: torch.device) -> AbstractStream: """:func:`torch.cuda.default_stream` for either CPU or CUDA device.""" if device.type ! = 'cuda': return CPUStream return torch.cuda.default_stream(device)Copy the code

2.3.3 record

The following methods are used to encapsulate CUDA record_stream.

def record_stream(tensor: torch.Tensor, stream: AbstractStream) -> None: """:meth:`torch.Tensor.record_stream` for either CPU or CUDA stream.""" if is_cuda(stream): # NOTE(sublee): Record_stream () on a misplaced view tensor throws # RuntimeError in PyTorch 1.1.0, And nothing in 1.2.0. To safely # Protect the tensor against unexpected reallocation, here we use a # temporal tensor associated with the same storage without shifting as # a workaround. # # Issue: https://github.com/pytorch/pytorch/issues/27366 # tensor = tensor.new_empty([0]).set_(tensor.storage()) tensor.record_stream(as_cuda(stream))Copy the code

2.3.4 waiting

The following methods encapsulate CUDA Wait_stream.

  • If both streams are CUDA streams, then one stream is waiting for the other to complete.
  • Otherwise, synchronize() is used to keep the CPU waiting for CUDA to complete.

Since the streaming operation is asynchronous, there is no way to determine if the operation has completed when the function returns, so synchronization is performed between the CPU and host, or between CUDA streams, to ensure that the GPU completes the streaming operation.

def wait_stream(source: AbstractStream, target: AbstractStream) -> None: """:meth:`torch.cuda.Stream.wait_stream` for either CPU or CUDA stream. It makes the source stream wait until the target  stream completes work queued. """ if is_cuda(target): if is_cuda(source): # A CUDA stream waits another CUDA stream. as_cuda(source).wait_stream(as_cuda(target)) else: # CPU waits a CUDA stream. as_cuda(target).synchronize() # If the target is CPU, synchronization is not required.Copy the code

Now that you’ve wrapped the basic Stream operations, Torchgpipe next uses these wrapper functions to implement copy and wait operations, as we’ll see.

Here both Wait_stream and Synchronize end up waiting, for example synchronize ends up calling cudaDeviceSynchronize, which stops the cpu-side thread, Until the GPU finishes the previous CUDA tasks (including kernel functions, data copy, etc.).

2.4 copy API

The API for copying streams is as follows, which calls the forward method of the Copy class.

def copy(batch: Batch, prev_stream: AbstractStream, next_stream: AbstractStream) -> None:
    batch[:] = Copy.apply(prev_stream, next_stream, *batch)
Copy the code

Copy extends torch. Autograd. Function by using record_stream to help Copy.

class Copy(torch.autograd.Function): """Copies tensors on specific streams.""" @staticmethod def forward(ctx: Context, # type: ignore prev_stream: AbstractStream, next_stream: AbstractStream, *input: Tensor, ) -> Tensors: ctx.prev_stream = prev_stream ctx.next_stream = next_stream output = [] output_stream = Current_stream (get_device(next_stream)) # Get the next stream with use_stream(prev_stream), use_stream(next_stream): for x in input: Y = x. tao (get_device(next_stream)) # 'prev_stream' is not where 'x' has been Allocated. Record_stream (x, prev_stream) # Allocated. Ensure that X # 'Y' has been allocated on 'next_stream' until copy is finished. # It might be used on the current stream captured as 'output_stream'. Record_stream (y, output_stream) Y return tuple(output) # staticmethod def backward(CTX: Context, *grad_output:) Tensor, ) -> Tuple[Optional[Tensor], ...] : prev_stream = ctx.prev_stream next_stream = ctx.next_stream grad_input: Deque[Tensor] = deque(maxlen=len(grad_output)) input_stream = current_stream(get_device(prev_stream)) with use_stream(prev_stream), use_stream(next_stream): for x in reversed(grad_output): y = x.to(get_device(prev_stream)) grad_input.appendleft(y) # 'next_stream' is not where 'x' has been allocated. record_stream(x, next_stream) # 'y' has been allocated on 'prev_stream'. # It might be used on the current stream captured as 'input_stream'. record_stream(y, input_stream) grad_streams: Tuple[Optional[Tensor], ...]  = (None, None) return grad_streams + tuple(grad_input)Copy the code

2.5 waiting for the API

Wait calls the forward method of the wait class.

def wait(batch: Batch, prev_stream: AbstractStream, next_stream: AbstractStream) -> None:
    batch[:] = Wait.apply(prev_stream, next_stream, *batch)
Copy the code

Wait also extends torch.autograd.Function by applying wait_stream, where one stream waits for another to complete.

class Wait(torch.autograd.Function): """Synchronizes a stream to another stream. Place it just before you want to start an operation on the next stream, provided that all operations on the previous stream are done. """ @staticmethod def forward(ctx: Context, # type: ignore prev_stream: AbstractStream, next_stream: AbstractStream, *input: Tensor, ) -> Tensors: ctx.prev_stream = prev_stream ctx.next_stream = next_stream wait_stream(next_stream, prev_stream) return tuple(x.detach() for x in input) @staticmethod def backward(ctx: Context, *grad_input: Tensor, ) -> Tuple[Optional[Tensor], ...] : prev_stream = ctx.prev_stream next_stream = ctx.next_stream wait_stream(prev_stream, next_stream) grad_streams: Tuple[Optional[Tensor], ...]  = (None, None) return grad_streams + grad_inputCopy the code

2.6 the use of

2.6.1 General Concepts

So let’s start with a flow chart in a comment, so you get the idea.

# With checkpointing, the autograd graph looks like this diagram: # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Copy │ # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ (fence) # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┃ (compute) # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [1] the Synchronize the current stream with the copy stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Checkpoint │ [2] Compute a partition within checkpointing. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [3] the Synchronize the copy stream With the current stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┐ # ┃ ┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ┐ # ┃ │ Recompute │ [4] the Schedule the recomputation The at backpropagation. # ┃ └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┘ # # ┃ ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ (fence) # │ Copy │ # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘Copy the code

2.6.2 Building a Copy Flow

In the GPipe class, a copy-specific stream is generated.

def forward(self, input: TensorOrTensors) -> TensorOrTensors: # type: ignore ...... # Separate CUDA streams for copy.copy_streams = self._ensure_copy_streams() # The micro-batch index where copy is generated  the checkpointing stops. # Run pipeline parallelism. pipeline = Pipeline(batches, self.partitions, self.devices, copy_streams, self._skip_layout, checkpoint_stop) pipeline.run() ......Copy the code

The _ensure_copy_streams code looks like this, generating a dedicated stream for each macro-batch on each device:

    def _ensure_copy_streams(self) -> List[List[AbstractStream]]:
        """Ensures that :class:`GPipe` caches CUDA streams for copy.
​
        It's worth to cache CUDA streams although PyTorch already manages a
        pool of pre-allocated CUDA streams, because it may reduce GPU memory
        fragementation when the number of micro-batches is small.
​
        """
        if not self._copy_streams:
            for device in self.devices:
                self._copy_streams.append([new_stream(device) for _ in range(self.chunks)])
​
        return self._copy_streams
Copy the code

Suppose there are 3 devices, the model is divided into 3 subnetworks, and small batches are divided into 4 microbatches. The details are as follows:

In _copy_streams[I][j], I represents the sequence of devices and J represents the batch sequence. This order will be used later.

+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | _copy_streams | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | [1, 1] [1, 2] [1, 3] + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | (2, 1] [2], [2, 3] + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + (3, 1], [3, 2] [3] | | | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | | | +----------------------------------+ | | | | | | v | | +------------------------------------------------------------------------+------+ | | | Stream of device 1, Stream of device 1, Stream of device 1, Stream of device 1| | | +-------------------------------------------------------------------------------+ | | | | +-------------------------------------------------------------------------------+ | | | Stream of device 2, Stream of device 2, Stream of device 2, Stream of device 2+<-+ | +-------------------------------------------------------------------------------+ | | +-------------------------------------------------------------------------------+ +-->+ Stream of device 3, Stream of device 3, Stream of device 3, Stream of device 3| +-------------------------------------------------------------------------------+Copy the code

2.6.3 Parallel Operations

Let’s take an example of how to do this in parallel, using stream below.

In the Run method of Pipeline class, there are the following codes to ensure parallel operation:

def run(self) -> None:
    with spawn_workers(devices) as (in_queues, out_queues):
        for schedule in clock_cycles(m, n):
            self.fence(schedule, skip_trackers)
            self.compute(schedule, skip_trackers, in_queues, out_queues)
Copy the code

Before each calculation, the fence method is used to copy data from one device to the next.

2.6.4 Pre-copy

The fence method does a pre-copy operation, which does the following:

  • Set dependencies, which we analyzed in the previous article.
  • Get the copy stream for the next device.
  • Get the copy stream of the previous device.
  • Copy the previous stream to the subsequent stream.
def fence(self, schedule: List[Tuple[int, int]], skip_trackers: List[SkipTrackerThroughPotals], ) -> None: """Copies micro-batches after computation for the previous micro-batches. """ batches = self.batches copy_streams = self.copy_streams skip_layout = self.skip_layout for i, j in schedule: # Ensure that batches[i-1] is executed after batches[i] in # backpropagation by an explicit dependency. if i ! = 0: [j][I][I] For prev_j, ns, name in skip_layout.copy_policy(j): # For space reasons, Prev_stream = copy_streams[prev_j][I] # copy [I]. Copy (prev_stream, next_stream, ns, name) if j ! [I][I] # copy the first batch of a product to a subsequent batchCopy the code

Following the example from the previous article, here is a schedule generation sequence.

m=4 # m: number of micro-batches n=3 # n: number of partitions for k in range(m + n - 1): Print ([(k-j +1, j +1) for j in range(Max (1 + k-m, 0), min(1 + k, n))]) print([(k-j +1, j +1) for j in range(Max (1 + k-m, 0), min(1 + k, n))]) [(1, 1)] # 1 round training plan & data [(2, 1), (1, 2)] # 2 rounds of training plan & data [(3, 1), (2, 2), (1, 3)] # 3 rounds of training plan & data [(4, 1), (3, 2), [(4, 3)] # round 5 Training plan & Data [(4, 3)] # Round 6 training plan & dataCopy the code

The first six cycles correspond to the following time flow: the first clock cycle (1,1) enters the system, and the second clock cycle (2,1) enters the system…..

+ + + + + + + | | | | | | | | | | | | | | cuda: 0 | | (1, 1) (2, 1) | | (3, 1) (4, 1) | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | cuda: 1 | | (1, 2) | | (2, 2) (3, 2) | (4, 2) | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | cuda: 2 | | | | (1, 3) (2, 3) | | (3, 3) (4, 3) | | | | | | | | | | | | | | | | | | | | | | | 1 2 | | clock clock 3 clock  | clock 4 | clock 5 | clock 6 | + + + + + + + +------------------------------------------------------------------------------> TimeCopy the code

Let’s look at the plan below, focusing on tasks completed in the third clock cycle.

The second clock cycle completes the following operations.

[(2, 1), (1, 2)] # Second round training plan & dataCopy the code

The plan for the third clock cycle is as follows:

[(3, 1), (2, 2), (1, 3)] # Third round training plan & dataCopy the code

Copy copy_streams[j-1][I] to copy_streams[j][I] for each I and j in schedule.

Note that in _copy_streams[I][j], I represents the device sequence and J represents the batch sequence, as opposed to schedule I and j.

+ + + + + + + | | | | | | | cuda: 0 | | (1, 1) (2, 1) | | (3, 1) (4, 1) | | | | | + | | | | | | | | | | | | | | | | | | | | |  | | | | | | | | | | +------------+ | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | v | | | | | | | | | | | cuda: 1 | | (1, 2) | | (2, 2) (3, 2) | (4, 2) | | | | + | | | | | | | | | | | | | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | v | | | | cuda: 2 | | | | (1, 3) (2, 3) | | (3, 3) (4, 3) | | | | | | | | | | | | | | | | | | | | | | | clock 1 | clock 2 | clock 3 | clock 4 | clock 5 | clock 6 | + + + + + + + +-----------------------------------------------------------------------------------> TimeCopy the code

2.6.5 calculation

Compute completes the following steps:

  • usewait(batch, copy_streams[j][i], streams[j])“Copy Stream” is synchronized to “compute Stream” to ensure that the copy operation is complete.
  • Secondly, calculation is carried out.
  • usewait(batch, streams[j], copy_streams[j][i])Synchronize the results from “compute stream” to “copy stream” to ensure that the calculation operation is complete.

Details are as follows:

def compute(self, schedule: List[Tuple[int, int]], skip_trackers: List[SkipTrackerThroughPotals], in_queues: List[InQueue], out_queues: List[OutQueue], ) -> None: """Runs tasks with synchronization to copy streams.""" batches = self.batches partitions = self.partitions devices = self.devices copy_streams = self.copy_streams checkpoint_stop = self.checkpoint_stop n = len(partitions) streams = [current_stream(d) for d in devices] exc_info: Optional[ExcInfo] = None # With checkpointing, the autograd graph looks like this diagram: # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Copy │ # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ (fence) # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┃ (compute) # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [1] the Synchronize the current stream with the copy stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Checkpoint │ [2] Compute a partition within checkpointing. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [3] the Synchronize the copy stream With the current stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┐ # ┃ ┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ┐ # ┃ │ Recompute │ [4] the Schedule the recomputation The at backpropagation. # ┃ └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┘ # # ┃ ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ (fence) # │ Copy │ # ├ ──┰─── our r company for I, J in schedule: batch = batches[i] partition = partitions[j] # Synchronize with the copied input. ([1] in the diagram) if j ! = 0: wait(batch, copy_streams[j][i], streams[j]) # Determine whether checkpointing or not. checkpoint = (i < checkpoint_stop) if checkpoint: def function(input: TensorOrTensors, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> TensorOrTensors: with use_skip_tracker(skip_tracker): return partition(input) chk = Checkpointing(function, batch) task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute) del function, chk else: def compute(batch: Batch = batch, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> Batch: with use_skip_tracker(skip_tracker): return batch.call(partition) task = Task(streams[j], compute=compute, ([2] in the diagram) in_queues[J]. Put (task) # j in schedule: ok, payload = out_queues[j].get() # Hold the first exception. if exc_info is not None: continue elif not ok: exc_info = cast(ExcInfo, payload) continue task, batch = cast(Tuple[Task, Batch], payload) # The copy stream synchronizes to copy the output. ([3] in the # diagram) if j ! = n-1: Wait (batch, streams[j], copy_streams[j][I]) # here the # recomputation is scheduled at backpropagation. ([4] in the # diagram) with use_device(devices[j]): task.finalize(batch) batches[i] = batch # Fail at the first exception. if exc_info is not None: raise exc_info[0].with_traceback(exc_info[1], exc_info[2])Copy the code

For the paper is:

0 x03 heavy computation

Next we will look at the important computation, which in the thesis is Autograd Functions with Shared Memory.

Since we have introduced similar parts in GPipe before, it is only added here for the completeness of the text, so the analysis is a bit short.

3.1 analytical

Due to the nuggets’ recent poor formula support, only stickers are available.

3.2 packaging API

Torchgpipe /checkpoint.py has a checkpoint method, which provides a simple API.

def checkpoint(function: Function, input: TensorOrTensors) -> TensorOrTensors:
    """Makes a checkpoint with a simple interface like
    :func:`torch.utils.checkpoint.checkpoint`. It's only used to test or debug
    :class:`Checkpoint` and :class:`Recompute` without boilerplate.
    """
    batch = Batch(input)
​
    chk = Checkpointing(function, batch)
    batch = chk.checkpoint()
    chk.recompute(batch)
​
    return batch.tensor_or_tensors
Copy the code

Timeline final result is [” A :forward”, “B :forward”,” B :forward”, “B: Backward “,” A :forward”, “A: Backward “],

See tests/test_checkpoint.py for details. Through clever log printing, we can see the use of checkpoint in forward and backward propagation at runtime.

The two groups correspond to forward pass, Checkpoint(Log[b]), and Checkpoint(Log[a]) respectively.

@pytest.mark.parametrize("device", devices)
def test_serial_checkpoints(device):
    # Copied from https://github.com/pytorch/pytorch/pull/18568.
    timeline = []
​
    class Log(torch.autograd.Function):
        @staticmethod
        def forward(ctx, name, x):
            ctx.name = name
            timeline.append(f"{name}:forward")
            return x.detach()
​
        @staticmethod
        def backward(ctx, grad_output):
            name = ctx.name
            timeline.append(f"{name}:backward")
            return None, grad_output
​
    a = torch.rand(1, device=device, requires_grad=True)
    b = torch.rand(1, device=device, requires_grad=True)
​
    # Increase the next function sequence number.
    _ = a + 1 + 2 + 3 + 4 + 5
​
    # 这里意味着最后 backward 实际会运行"a:forward", "a:backward"
    a = checkpoint(partial(Log.apply, "a"), a)
​
    a, phony = fork(a)
    b = join(b, phony)
​
    # 这里意味着最后 backward 实际会运行"b:forward", "b:backward"
    b = checkpoint(partial(Log.apply, "b"), b)
​
    c = torch.cat((a, b))
​
    out = c.sum()
​
    #                        +--> {a} --Checkpoint(Log)--> {a}
    # {out} --Sum--> {c} --Cat     ^-----------------------------+
    #                        +--> {b} --Checkpoint(Log)--> {b} --First--> {b}
    out.backward()
​
    assert timeline == ["a:forward", "b:forward", "b:forward", "b:backward", "a:forward", "a:backward"]
    #    |----------------------|  |-----------------------|  |-----------------------|
    #          forward pass            Checkpoint(Log[b])         Checkpoint(Log[a])
Copy the code

The Checkpoint API calls Checkpointing, so let’s look at the implementation.

The implementation provides two methods: checkpoint and recompute. Two classes are called separately.

class Checkpointing: """Generates a pair of :class:`Checkpoint` and :class:`Recompute`.""" def __init__(self, function: Function, batch: Batch) -> None: self.function = function self.batch = batch # Shared memory between Checkpoint and Recompute. 1-length deque is # used for mutability and length limitation. self.recomputed: Deque[Recomputed] = deque(maxlen=1) self.rng_states: Deque[RNGStates] = deque(maxlen=1) def checkpoint(self) -> Batch: """Returns a batch applied by :class:`Checkpoint`.""" input_atomic = self.batch.atomic input = tuple(self.batch) # Use a  phony which requires grad to ensure that Checkpoint can be # tracked by the autograd engine even when none of the input  tensors # require grad. phony = get_phony(self.batch[0].device, requires_grad=True) output = Checkpoint.apply(phony, self.recomputed, self.rng_states, self.function, input_atomic, *input) return Batch(output) def recompute(self, batch: Batch) -> None: """Applies :class:`Recompute` to the batch in place.""" input_atomic = self.batch.atomic input = tuple(self.batch) # batch[0] is always requiring grad, because it has been passed # checkpoint with a phony requiring grad. batch[0], phony = fork(batch[0]) phony = Recompute.apply(phony, self.recomputed, self.rng_states, self.function, input_atomic, *input) batch[0] = join(batch[0], phony)Copy the code

3.3 implementation

Checkpoint and the following Recompute divide the Checkpoint code in normal mode into two phases (the forward function is divided into two segments and the backward function into two segments) to make better use of the pipeline.

The corresponding paper is:

3.3.1 Checkpoint

class Checkpoint(torch.autograd.Function): @staticmethod # type: ignore[override] def forward( ctx: Context, phony: Tensor, recomputed: Deque[Recomputed], rng_states: Deque[RNGStates], function: Function, input_atomic: bool, *input: Tensor, ) -> TensorOrTensors: Ctx. recomputed = recomputed CTx. rng_STATES = rng_States # Save the RNG state save_rng_STATES (INPUT [0]. Device, Ctx.rng_states) ctx.function = function ctx.input_atomic = input_atomic # Save_for_backward (*input) with torch. No_grad (), enable_checkpointing(): output = function(input[0] if input_atomic else input) return output @staticmethod def backward(ctx: Context, *grad_output: Tensor,) -> Tuple[Optional[Tensor], ...] Input_leaf = ctx.recomputed. Pop () if isinstance(output, tuple): tensors = output else: tensors = (output,) if any(y.requires_grad for y in tensors): • Tensors = tuple([x for X in tensors if x.requires_grad]) # grad_output) grad_input: List[Optional[Tensor]] = [None, None, None, None, None] grad_input.extend(x.grad for x in input_leaf) return tuple(grad_input)Copy the code

3.3.2 rainfall distribution on 10-12 Recompute

Recompute means recalculating the intermediate variables based on the stored information.

class Recompute(torch.autograd.Function): @staticmethod # type: ignore[override] def forward( ctx: Context, phony: Tensor, recomputed: Deque[Recomputed], rng_states: Deque[RNGStates], function: Function, input_atomic: bool, *input: Tensor, ) -> Tensor: ctx.recomputed = recomputed ctx.rng_states = rng_states ctx.function = function ctx.input_atomic = input_atomic ctx.save_for_backward(*input) return phony @staticmethod def backward(ctx: Context, *grad_output: Tensor) -> Tuple[None, ...] : Input = ctx.saved_tensors input_leaf = tuple(x.dietach ().requires_grad_(x.equires_grad) for x in input) With restore_rng_states(INPUT [0]. Device, ctx.rng_STATES): with torch. Enable_grad (), enable_recomputing(): Output = ctx.function(input_leaf[0] if ctx.input_atomic else input_leaf) Use ctx.recomputed. Append ((output, input_leaf)) grad_input for Checkpoint: List[None] = [None, None, None, None, None] grad_input.extend(None for _ in ctx.saved_tensors) return tuple(grad_input)Copy the code

3.4 Overall Invocation

The overall call code is as follows:

def compute(self, schedule: List[Tuple[int, int]], skip_trackers: List[SkipTrackerThroughPotals], in_queues: List[InQueue], out_queues: List[OutQueue], ) -> None: """Runs tasks with synchronization to copy streams.""" batches = self.batches partitions = self.partitions devices = self.devices copy_streams = self.copy_streams checkpoint_stop = self.checkpoint_stop n = len(partitions) streams = [current_stream(d) for d in devices] exc_info: Optional[ExcInfo] = None # With checkpointing, the autograd graph looks like this diagram: # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Copy │ # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ (fence) # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┃ (compute) # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [1] the Synchronize the current stream with the copy stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Checkpoint │ [2] Compute a partition within checkpointing. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [3] the Synchronize the copy stream With the current stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┐ # ┃ ┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ┐ # ┃ │ Recompute │ [4] the Schedule the recomputation The at backpropagation. # ┃ └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┘ # # ┃ ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ (fence) # │ Copy │ # ├ ──┰─── our r company for I, J in schedule: batch = batches[i] partition = partitions[j] # Synchronize with the copied input. ([1] in the diagram) if j ! = 0: wait(batch, copy_streams[j][i], streams[j]) # Determine whether checkpointing or not. checkpoint = (i < checkpoint_stop) if checkpoint: def function(input: TensorOrTensors, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> TensorOrTensors: with use_skip_tracker(skip_tracker): return partition(input) chk = Checkpointing(function, batch) task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute) del function, chk else: def compute(batch: Batch = batch, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> Batch: with use_skip_tracker(skip_tracker): return batch.call(partition) task = Task(streams[j], compute=compute, finalize=None) del compute # Compute tasks in parallel. ([2] in the diagram) in_queues[j].put(task) for i, j in schedule: ok, payload = out_queues[j].get() # Hold the first exception. if exc_info is not None: continue elif not ok: exc_info = cast(ExcInfo, payload) continue task, batch = cast(Tuple[Task, Batch], payload) # The copy stream synchronizes to copy the output. ([3] in the # diagram) if j ! = n-1: wait(batch, streams[j], copy_streams[j][i]) # Finalize tasks. If checkpointing is enabled, here the # recomputation is scheduled at backpropagation. ([4] in the # diagram) with use_device(devices[j]): task.finalize(batch) batches[i] = batch # Fail at the first exception. if exc_info is not None: raise exc_info[0].with_traceback(exc_info[1], exc_info[2])Copy the code

At this point, the parallel analysis of PyTorch pipeline is completed. Our next plan is to systematically comb through PyTorch parallel training. First, we need to analyze the basic knowledge related to gradients.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Markdown formula

Formula editing tutorial in Markdown

Docs.nvidia.com/cuda/cuda-r…

CUDA Learning: Basics Summary

Use of CUDA Stream

NVIDIA solution architect in-depth parsing of large-scale parametric language model Megatron-Bert

Accelerating Wide & Deep Recommender Inference on GPUs

HugeCTR: High-Performance Click-Through Rate Estimation Training

Discuss.pytorch.org/t/how-to-pr…

Github.com/NVIDIA/apex…

Github.com/justheurist…

Pytorch.org/tutorials/i…

Pytorch.org/docs/stable…

Pytorch.org/docs/notes/…

zhuanlan.zhihu.com/p/61765561

Pytorch.apachen.org/docs/1.7/64…

Zhidx.com/p/217999.ht…