0 x00 the
In previous articles we covered the basics of PyTorch pipeline-parallelism and automatic balancing, and in this article we show you how to shred data and runtime systems.
Pipelining parallelism other articles are linked below:
Deep learning pipeline parallel Gpipe(1)– pipeline basic implementation
Deep learning pipeline parallel GPipe (2) —– gradient accumulation
Deep learning pipeline parallel GPipe(3) —- recalculation
Deep learning pipeline parallel PipeDream(1)– Profile stage
Deep learning pipeline parallel PipeDream(2)– computing partitions
Deep learning pipeline parallel PipeDream(3)– transformation model
Deep learning pipeline parallel PipeDream(4)– runtime engine
Deep learning pipeline parallel PipeDream(5)– communication module
Deep learning pipeline parallel PipeDream(6)– 1F1B strategy
PyTorch Pipeline parallel implementation (1)– Basics
PyTorch Pipeline parallel implementation (2)– How to divide the model
Finally, the runtime system is as follows:
0x01 Split small batches
Let’s first look at splitting a Mini-batch into multiple Micro-batches.
1.1 the use of
As you can see from the sample code below, the scatter method is used for segmentation.
# Divide a mini-batch into micro-batches.
batches = microbatch.scatter(input, self.chunks)
# Run pipeline parallelism.
pipeline = Pipeline(batches,
self.partitions,
self.devices,
copy_streams,
self._skip_layout,
checkpoint_stop)
pipeline.run()
# Merge the micro-batches into one mini-batch.
output = microbatch.gather(batches)
return output
Copy the code
1.2 PyTorch base
Let’s start with some basic code for PyTorch.
1.2.1 the chunk
The chunk method can block tensors and return a list of tensors with the following parameters:
- Ensor: tensor to be partitioned.
- “Chunks” : the number of chunks to be divided
- Dim: Blocks along which axis
Specific examples are as follows:
import numpy as np import torch data = torch.from_numpy(np.random.rand(3, 5)) print(str(data)) for i, data_i in enumerate(data.chunk(3, 0)): Print (STR (data_i)) output tensor([[0.1208, 0.3428, 0.4586, 0.9372, 0.6410], [0.7889, 0.4480, 0.7607, 0.7903, [0.8391, 0.6649, 0.8338, 0.3477, 0.3953]] dType = Torch. Float64) tensor [[0.1208, 0.3428, 0.4586, 0.9372, Dtype = Torch. Float64) tensor([[0.7889, 0.4480, 0.7607, 0.7903, 0.4118]] dType = Torch. Float64) tensor([[0.7889, 0.4480, 0.7903, 0.4118]] ], dType =torch. Float64)Copy the code
1.2.2 the cat
Cat works by concatenating tensors together, or concatenating a list of tensors.
Z = torch. Cat ((X,Y),0) (X,Y),0Copy the code
Let’s use an example:
X = torch.ones(2, 5)
Y = torch.ones(4, 5)
Z = torch.cat((X, Y), 0)
print(Z)
Copy the code
The result is:
tensor([[1., 1., 1., 1., 1.], [1., 1., 1., 1., 1.], [1., 1., 1., 1., 1.], [1., 1., 1., 1., 1.], [1., 1., 1., 1., 1.], [1., 1., 1., 1.]Copy the code
1.3 Segmentation & aggregation
Back to the Scatter batch, let’s look at the Scatter code.
def scatter(input: TensorOrTensors, chunks: int) -> List[Batch]: """Splits an input mini-batch into multiple micro-batches.""" inputs: Iterable[TensorOrTensors] if isinstance(input, Tensor): inputs = input. List[Tensors] = [] for tensor in input: # if it is a tensor array, Tensors = tensor. Chunk (chunks) # The value of each tensor was divided. Append (Cast (Tensors, Tensors) # the estimation results were all of the Tuple list inputs = zip(* Rotated) # Batch List returnsCopy the code
The Gather method is a reverse scatter method.
def gather(outputs: List[Batch]) -> TensorOrTensors:
"""Concatenates output micro-batches into a mini-batch."""
output: TensorOrTensors
if outputs[0].atomic:
tensors = tuple(b.tensor for b in outputs)
output = torch.cat(tensors)
else:
rotated = [b.tensors for b in outputs]
output_buf = []
for tensors in zip(*rotated):
output_buf.append(torch.cat(tensors))
output = tuple(output_buf)
return output
Copy the code
1.4 analyze
So let’s see how it works, the following code is to break up the list of tensors ab into two pieces.
def test_scatter_tuple():
ab = (torch.ones(2, 1), torch.zeros(4, 2), torch.zeros(6, 3))
a, b = scatter(ab, chunks=2)
assert a.tensors[0].size() == (1, 1)
assert b.tensors[0].size() == (1, 1)
assert a.tensors[1].size() == (2, 2)
assert b.tensors[1].size() == (2, 2)
assert a.tensors[2].size() == (3, 3)
assert b.tensors[2].size() == (3, 3)
Copy the code
So let’s draw a picture.
+-------------------------------------------------------------+ | ab | | | | +-----------+ +---------+ +----------+ | | | | | | | 0 0 0 | | | | | | 0 0 | | 0 0 0 | | | | 1 | | 0 0 | | 0 0 0 | | | | 1 | | 0 0 | | 0 0 0 | | | | | | 0 0 | | 0 0 0 | | | | | | | | 0 0 0 | | | + -- -- -- -- -- -- -- -- -- -- - + + -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- + | | | +-------------------------------+-----------------------------+ | | | a, b = scatter(ab, chunks=2) | | | | | v +------------------------------+ +-----------------------------+ | a | |b | | +---+ +-----+ + -- -- -- -- -- -- -- -- + | | + -- -- -- + + + + -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | 1 | | 0 0 | | 0 0 0 | | | | 1 | | 0 0 | | 0 0 0 | | | + -- -- -- + | 0 0 | | 0 0 0 | | | + -- -- -- + | 0 0 | | 0 0 0 | | | + -- -- -- -- -- + | 0 0 0 | | | + -- -- -- -- -- + | 0 0 0 | | | + -- -- -- -- -- -- -- -- + | | + -- -- -- -- -- -- -- -- + | +------------------------------+ +-----------------------------+Copy the code
You can also see how to aggregate using the sample code below.
def test_gather_tensors():
a = torch.zeros(1, 1)
b = torch.zeros(1, 1)
ab = gather([Batch(a), Batch(b)])
assert ab.size() == (2, 1)
def test_gather_tuples():
a = (torch.zeros(1, 1), torch.zeros(2, 2))
b = (torch.zeros(1, 1), torch.zeros(2, 2))
ab = gather([Batch(a), Batch(b)])
assert isinstance(ab, tuple)
assert ab[0].size() == (2, 1)
assert ab[1].size() == (4, 2)
Copy the code
0 x02 run
Let’s look at some of the run-time infrastructure, including streams, Tasks, and workers.
2.1 the Stream
The Stream class is used to encapsulate CUDA streams and CPU streams. The code is located at: torchgpipe/stream.py.
CUDA stream represents a GPU operation queue, that is, a kernel sequence bound to a device in order. We can think of a stream as a task on a GPU. The user adds a series of operations to the stream queue, and the GPU performs these operations in the order they were added to the stream. In the same flow, all operations are serialized, so they are never parallel. Therefore, to be parallel, the two operations must be in different streams. The kernel functions in different flows can be interlaced and may even overlap.
class CPUStreamType:
pass
# The placeholder on place of streams for the CPU device instead of CUDA.
CPUStream = CPUStreamType()
# It represents both CUDA streams and the CPU stream.
AbstractStream = Union[torch.cuda.Stream, CPUStreamType]
Copy the code
The role of torch.cuda.stream(stream) is to select the context manager for a given stream.
The related operation used in this article is use_stream.
@contextmanager
def use_stream(stream: AbstractStream) -> Generator[None, None, None]:
""":func:`torch.cuda.stream` for either CPU or CUDA stream."""
if not is_cuda(stream):
yield
return
with torch.cuda.stream(as_cuda(stream)):
yield
def is_cuda(stream: AbstractStream) -> bool:
"""Returns ``True`` if the given stream is a valid CUDA stream."""
return stream is not CPUStream
def as_cuda(stream: AbstractStream) -> torch.cuda.Stream:
"""Casts the given stream as :class:`torch.cuda.Stream`."""
return cast(torch.cuda.Stream, stream)
Copy the code
2.2 the Task
Task represents how to calculate micro-batch data on a partition. It consists of two parts:
compute
It should be executed concurrently in a worker thread.finalize
Should be executed after the worker thread completes.
It can be understood as a business processing logic. For those of you who have experience with Android, it can be interpreted as similar to business Message. In fact, Android Message is also called task, which encapsulates the information carried by the task and the handler that processes the task.
When constructing a Task, compute and Finalize methods are passed in as follows:
task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute)
Copy the code
Or as follows:
def compute(batch: Batch = batch,
partition: nn.Sequential = partition,
skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
) -> Batch:
with use_skip_tracker(skip_tracker):
return batch.call(partition)
task = Task(streams[j], compute=compute, finalize=None)
Copy the code
Tasks are defined as follows: Tasks are tied to a Stream, which means they can run on any device.
class Task: """A task represents how to compute a micro-batch on a partition. It consists of two parts: :meth:`compute` and :meth:`finalize`. :meth:`compute` should be executed in worker threads concurrently. :meth:`finalize` should be executed after when worker threads complete to execute :meth:`compute`. :meth:`compute` might be boosted by worker threads. Because it produces several CUDA API calls by user code. In PyTorch, parallel CUDA API calls are not serialized through GIL. So more than one CUDA API call can be produced at the same time. """ def __init__(self, stream: AbstractStream, *, compute: Callable[[], Batch], finalize: Optional[Callable[[Batch], None]], ) -> None: self.stream = stream self._compute = compute self._finalize = finalize def compute(self) -> Batch: Return self._compute() # call the passed business code def finalize(self, batch:) Batch) -> None: if self._finalize is None: return with use_stream(self.stream): # bind on stream self._finalize(Batch) # call the incoming business codeCopy the code
2.3 the Worker
Worker is used to run tasks, and each device has a worker responsible for executing tasks on this device. Looper if you have experience with Android.
It is important to note that worker is just a function. If it runs, it needs a thread as a anchor. This is the follow-up work for spawn_workers.
def worker(in_queue: InQueue, out_queue: OutQueue, device: torch.device, grad_mode: bool, ) -> None: """The main loop of a worker thread.""" torch.set_grad_enabled(grad_mode) with use_device(device): while True: Task = in_queue.get() # Obtain task from input queue if task is None: break try: batch = task.compute() # calculate task except Exception: exc_info = cast(ExcInfo, sys.exc_info()) out_queue.put((False, exc_info)) continue out_queue.put((True, (task, Batch)) # batch = (False, None) out_queue.put(done)Copy the code
2.4 to generate the worker
The @contextManager annotation is used. This object implements the context management protocol. It is used to save and restore various global states, close files, etc. except… Finally provides an easy-to-use package.
Spawn_workers Spawn_worker Spawn_worker Spawn_worker spawn_worker Spawn_worker Spawn_worker spawn_worker spawn_worker spawn_worker spawn_worker
Spawn_workers not only generates workers, but also a pair of message queues (in_queues, out_queues), which exist throughout the Pipeline life cycle. Specifically:
- Spawn_workers Generates an in internally for each devicequeue, outThe queue. So it is guaranteed that there is a serial on top of each device to perform business operations.
in_queue, out_queue = workers[device]
Copy the code
- These queues have been added to (in_queues, out_queues).
in_queues.append(in_queue)
out_queues.append(out_queue)
Copy the code
-
Next, use (in_queues, out_queues) as contexts for communicating information between tasks.
-
The order of in_queues is the order of the devices, also known as partitions. Out_queues versa.
The specific code is as follows:
@contextmanager def spawn_workers(devices: List[torch.device], ) -> Generator[Tuple[List[InQueue], List[OutQueue]], None, None]: """Spawns worker threads. A worker thread is bound to a device.""" in_queues: List[InQueue] = [] out_queues: List[OutQueue] = [] # Spawn workers. workers: Dict[torch.device, Tuple[InQueue, OutQueue]] = {} def normalize_device(device: torch.device) -> torch.device: if device.type == 'cuda' and device.index is None: return torch.device('cuda', index=torch.cuda.current_device()) if device.type == 'cpu' and device.index is not None: Return torch. Device (' CPU ') return device for device in devices: device = normalize_device(device) In_queue, out_queue = workers[device] If the device has not already generated the corresponding queues, Queue workers[device] = (in_queue, workers[device] = (in_queue, workers]) Args =(in_queue, out_queue, device); torch.is_grad_enabled()), daemon=True, Start () # queues. Append (in_queue) # queues. Append (out_queue) # queues. Finally: # Close workers. For in_queues in set(in_queues): in_queue.put(None) # Join running workers. running = set(out_queues) while running: out_queue = running.pop() ok, payload = out_queue.get() done = (False, None) if (ok, payload) == done: continue running.add(out_queue)Copy the code
2.5 the use of
2.5.1 When to generate the worker
The example is in torchgpipe/pipeline.py, where workers are generated in the run function of the Pipeline class. As we can see, the queues that make sense for pipelines are (in_queues, out_queues).
def run(self) -> None: """Runs pipeline parallelism. It modifies the given batches in place. """ batches = self.batches partitions = self.partitions devices = self.devices skip_layout = self.skip_layout m = len(batches) n = len(partitions) skip_trackers = [SkipTrackerThroughPotals(skip_layout) for _ in batches] with spawn_workers(devices) as (in_queues, out_queues): Create a queue for schedule in clock_cycles(m, n): # this is according to the algorithm in order to run multiple fence, compute the self. The fence (schedule, skip_trackers) # the queue passed in self.com pute (schedule, skip_trackers. in_queues, out_queues)Copy the code
2.5.2 analysis
Torchgpipe uses Python’s Queue data structure.
The Queue class implements a basic first-in, first-out (FIFO) container.
A multi-producer, multi-consumer queue.
Copy the code
The main methods are:
- Queue.get([block, [timeout]]) reads the Queue and removes the element from the end of the Queue.
- Queue.put(item, [block, [timeout]]) Writes to the Queue, adding the element to the end of the sequence.
I’m more used to thinking of (in_queues, out_queues) as Linux-like pipes.
Linux pipe is a basic IPC mechanism that functions between related processes to complete data transfer. Its specific features are as follows:
- A pipe is a FIFO file managed by a kernel. It is actually a buffer, which is like a pipe we put in memory. Two processes are on opposite sides of the pipe, passing information through the pipe.
- One end of the pipe is connected to the output of a process. This process puts information into the pipe. When the pipe is full of information, the process trying to put the information in will wait until the process on the other end takes it out.
- The other end of the pipe connects to the input of another process, which fetches the information put into the pipe. When there is no information in the pipe, the process reading from the pipe waits until the process on the other end puts the information in.
To return to TorchPipe, let’s take a look at the content of the paper in advance:
For this fine-grained order control, Torchgpipe implements checkpointing using two separate autograd functions Checkpoint and Recompute. During the execution time of the task, a pair of Checkpoint and Recompute with shared memory is generated. The shared memory is used in backward propagation to transfer the local compute graph generated by Recompute to Checkpoint for backward propagation.
As a result, there are many requirements for parallel processing, so we can see the result of putting tasks into in_queues and removing tasks from out_queues in the Compute method of the Pipeline class.
def compute(self, schedule: List[Tuple[int, int]], skip_trackers: List[SkipTrackerThroughPotals], in_queues: List[InQueue], out_queues: List[OutQueue], ) -> None: # With checkpointing, the autograd graph looks like this diagram: # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Copy │ # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ (fence) # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┃ (compute) # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [1] the Synchronize the current stream with the copy stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Checkpoint │ [2] Compute a partition within checkpointing. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ # │ Wait │ [3] the Synchronize the copy stream With the current stream. # └ ─ ─ ─ ─ ─ ┰ ─ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┐ # ┃ ┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ┐ # ┃ │ Recompute │ [4] the Schedule the recomputation The at backpropagation. # ┃ └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ # ┠ ─ ─ ─ ┘ # # ┃ ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─ # ┌ ─ ─ ─ ─ ─ ┸ ─ ─ ─ ─ ─ ─ ┐ (fence) # │ Copy │ # ├ ──┰─── our r company for I, J in schedule: [1] In the diagram # Batch = batch [I] partition = partitions[J] # Synchronize with the copied input Determine whether checkpointing or not. if checkpoint: def function(input: TensorOrTensors, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> TensorOrTensors: with use_skip_tracker(skip_tracker): = = = = = = = = = = = = = = = = = = finalize=chk.recompute) del function, chk else: def compute(batch: Batch = batch, partition: nn.Sequential = partition, skip_tracker: SkipTrackerThroughPotals = skip_trackers[i], ) -> Batch: with use_skip_tracker(skip_tracker): Task = Task(streams[j], compute=compute, finalize=None) del compute # Compute tasks in parallel. ([2] in the diagram) in_queues[j].put(task) # Insert a new task into the JTH partition. Since I and j are already set in the clock algorithm, forward propagation follows this path. For I, j in schedule: ok, payload = out_queues[j] # omit subsequent codeCopy the code
2.6 summarize
Let’s summarize the general business logic (more on that later) :
- The system call spawn_workers generates several workers.
- Spawn_workers Spawn_worker Spawn_worker Spawn_worker spawn_worker Spawn_worker Spawn_worker spawn_worker spawn_worker spawn_worker spawn_worker Spawn_workers also generates an in_queue, out_queue for each device. So it is guaranteed that there is a serial on top of each device to perform business operations.
- These queues have been added to (in_queues, out_queues). The (in_queues, out_queues) queues are then returned to the Pipeline main thread. Next, use (in_queues, out_queues) as contexts for communicating information between tasks.
- After the main queue (in_queues, out_queues) has been achieved, if a Task needs to run via compute, insert the Task into the in_queue corresponding to its device.
- The Worker Thread blocks on the in_queue, and if it finds something, it reads the Task and runs the Task.
- The Worker Thread inserts the result to the out_queue.
- Pipeline’s compute method fetches the run results from the out_queue for subsequent processing.
As shown below:
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | 1 | | +--------------------------------------------------------------+ | | | 3 (in_queues, out_queues) | | | v | v +--------------------------------+---------+ +------+----+-----------------------------------------------------------------------+ | Pipeline | | | spawn_workers | | | | | | | | | | +-------------------------------------+ | | | | | | workers | | | | | | | | t = Thread( | | + | | | | target=worker, | | spawn_workers(devices) | | | device 1 : in_queue 1, out_queue 1 | args=(in_queue, out_queue, device), | | | | | | daemon=True, | | | | | device 2 : in_queue 2, out_queue 2 | ) | | +--------------------------------------+ | | | | t.start() | | | compute | | | | device 3 : in_queue 3, out_queue 3 | + | | | | | | | | | | | | | | 4 | | | | | | | in_queues[j].put(task) +-----------------------+ | +-------------------------------------+ | | | | | | | +-----------------------------------------------------------------------------------+ | | | | | | 2 | | ok, payload = out_queues[j].get()<--------+ | +---------------------+ | | | | | | | | in_queues | v | +--------------------------------------+ | | | | | | | | +------------> in_queue 1 +--------+ +---------------------------------------------------------------------+ +------------------------------------------+ | | in_queue 2 | | | Thread | | | in_queue 3 | | | | | | | | 5 | +------------------------------------------------------------+ | | 7 +---------------------+ | | | Worker | | | +---------------------+ | | | | | | | out_queues | | | | device 1 task = in_queue.get() | | | | | | task | | | | +------------------+ out_queue 1 <--+ | +----------------------> in_queue 1 batch = task.compute() | | (True, (task,,batch)) | out_queue 2 | | | | | | | out_queue 3 +---------------------------+ out_queue 1 out_queue.put((True, (task, batch))) | | | | 6 | | | | +---------------------+ | +------------------------------------------------------------+ | +---------------------------------------------------------------------+Copy the code
The mobile phone is as follows:
So far, we’ve looked at how to shred data and some runtime mechanisms, and in the next article we’ll look at the implementation in combination with the paper.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
0 XFF reference
Markdown formula
Formula editing tutorial in Markdown
Docs.nvidia.com/cuda/cuda-r…
CUDA Learning: Basics Summary
Use of CUDA Stream
NVIDIA solution architect in-depth parsing of large-scale parametric language model Megatron-Bert
Accelerating Wide & Deep Recommender Inference on GPUs
HugeCTR: High-Performance Click-Through Rate Estimation Training
Discuss.pytorch.org/t/how-to-pr…
Github.com/NVIDIA/apex…
Github.com/justheurist…
Pytorch.org/tutorials/i…
Pytorch.org/docs/stable…
Pytorch.org/docs/notes/…
zhuanlan.zhihu.com/p/61765561
Pytorch.apachen.org/docs/1.7/64…
Zhidx.com/p/217999.ht…