0 x00 the
In the previous articles, we looked at the basic modules of PyTorch’s distributed logic. The next few articles will take a look at how to put these modules into practice and give a general overview of PyTorch’s distributed logic. This paper introduces how to train a model by combining distributed automatic differentiation and distributed optimizer.
In this paper, pytorch.org/tutorials/i… Part of the translation based on their own understanding.
PyTorch distributed other articles as follows:
PyTorch distributed (1)—— History and Overview
PyTorch how to use GPU
PyTorch distributed (2) —– DataParallel – gradient
PyTorch distributed (3) —– DataParallel – gradient
PyTorch distributed (4)—— Distributed application concepts
—— DistributedDataParallel – what to use
DistributedDataParallel — gradient — gradient — — — — — —
—– DistributedDataParallel – conditional processing groups
PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel
—– DistributedDataParallel – gradient initialization
PyTorch distributed (10)—— distributed Dataparreducer static schema
—– DistributedDataParallel constructs Reducer and Join operations
—– DistributedDataParallel – gradient forward propagation
—– DistributedDataParallel – gradient back-propagation
PyTorch distributed Autograd (1) —- design
PyTorch Distributed Autograd (2) —- RPC foundation
PyTorch Distributed Autograd (3) —-
PyTorch Distributed Autograd (4) —-
PyTorch Distributed Autograd (5) —-
PyTorch Distributed Autograd (6) —-
PyTorch Distributed optimizer (1)—- cornerstone
PyTorch Distributed optimizer (2)—- Data parallel optimizer
PyTorch Distributed optimizer (3)—- model parallel
0 x01 instructions
First of all, there are two parts of the original text: reinforcement learning and RNN. This paper only translated the RNN part. Moreover, this paper did not translate the text in full accordance with the order of the original text, but reorganized the text according to my own understanding, and looked at the system from a top-down perspective.
This article uses the RNN model to show how to build distributed model parallel training using RPC apis. The sample RNN model is very small and could easily fit into a single GPU, but we still layered it over two different workers to demonstrate distributed training. Developers can apply similar techniques to distribute larger models across multiple devices and machines.
Note: In these official distributed articles, worker sometimes refers to all processes in a distributed system, while the actual training process is often called trainer. The worker in this paper includes a trainer and a parameter server.
0 x02 start
During startup, the run_worker method starts a trainer and a parameter server, which has no behavior in the code.
def run_worker(rank, world_size) :
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 1:
# Start trainer
rpc.init_rpc("trainer", rank=rank, world_size=world_size)
# trainer Business logic
_run_trainer()
else:
# start the parameter server
rpc.init_rpc("ps", rank=rank, world_size=world_size)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)
Copy the code
The details are as follows:
torch.multiprocessing.spawn
+
|
|
+-----------------+--------------------+
| |
| |
v v
+---+---------------------+ +------------+-------------+
| "ps" rank = 0 | | "trainer" rank = 1 |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
+-------------------------+ +--------------------------+
Copy the code
0x03 Trainer
Let’s look at the training cycle. After initializing the model parameters, we create “RNNModel” and “DistributedOptimizer”. The distributed optimizer will get the list of parameters “RRefs”, find all the different owner workers for these parameters, and create the given local optimizer (in this case, “SGD”) on each owner worker with the given parameter (i.e. “LR =0.05”), You can also use other native optimizers).
In the training loop, it does the following:
- Start by creating the distributed Autograd context, which will help the distributed Autograd engine find gradients and the involved RPC Send/RECV functions.
- It then starts propagating forward like the local model and runs distributed backward propagation. For distributed backward propagation, you only need to specify the List of roots, which in this case is the Loss tensor. The distributed Autograd engine will automatically traverse the distributed computation graph and write gradients correctly.
- Next, it runs the ‘step’ function on the distributed optimizer, which contacts all relevant local optimizers to update model parameters. One difference compared to native training is that the user does not need to run it
zero_grad()
, because each Autograd context has a dedicated space to store gradients, so that gradients from different iterations do not accumulate on the same set of tensors as each iteration creates a context.
The specific code is as follows:
def run_trainer() :
batch = 5
ntoken = 10
ninp = 2
nhid = 3
nindices = 3
nlayers = 4
hidden = (
torch.randn(nlayers, nindices, nhid),
torch.randn(nlayers, nindices, nhid)
)
model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)
# setup distributed optimizer
opt = DistributedOptimizer( Create a distributed optimizer
optim.SGD,
model.parameter_rrefs(),
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
def get_next_batch() :
for _ in range(5):
data = torch.LongTensor(batch, nindices) % ntoken
target = torch.LongTensor(batch, ntoken) % nindices
yield data, target
# train for 10 iterations
for epoch in range(10) :for data, target in get_next_batch():
# create distributed autograd context
with dist_autograd.context() as context_id: Create a distributed context
hidden[0].detach_()
hidden[1].detach_()
output, hidden = model(data, hidden)
loss = criterion(output, target)
# run distributed backward pass
dist_autograd.backward(context_id, [loss]) Perform distributed backward propagation
# run distributed optimizer
opt.step(context_id) # Distributed optimizer for updates
# not necessary to zero grads since they are
# accumulated into the distributed autograd context
# which is reset every iteration.
print("Training epoch {}".format(epoch))
Copy the code
The logical extension is:
torch.multiprocessing.spawn
+
|
|
+-----------------+--------------------+
| |
| |
v v
+---+---------------------+ +------------+-----------------------------------+
| "ps" rank = 0 | | "trainer" rank = 1 |
| | | |
| | | |
| | | |
| | | model = rnn.RNNModel('ps') |
| | | |
| | | |
| | | dist_autograd.backward(context_id, [loss]) |
| | | |
| | | |
| | | DistributedOptimizer.step(context_id) |
| | | |
| | | |
| | | |
+-------------------------+ +------------------------------------------------+
Copy the code
0 x04 model
Let’s look at the concrete model.
4.1 components
The RNN model design borrows from the Word language model in PyTorch’s example library example, which contains three main components: embedded tables, LSTM layers, and decoders.
4.1.1 Reference Code
It is necessary to post the original reference code for comparison. It can be seen that both Embedding and Linear exist as member variables of RNNModel and the whole RNNModel is very tightly coupled.
class RNNModel(nn.Module) :
"""Container module with an encoder, a recurrent module, and a decoder."""
def __init__(self, rnn_type, ntoken, ninp, nhid, nlayers, dropout=0.5, tie_weights=False) :
super(RNNModel, self).__init__()
self.ntoken = ntoken
self.drop = nn.Dropout(dropout)
self.encoder = nn.Embedding(ntoken, ninp) Insert table member variables
if rnn_type in ['LSTM'.'GRU']:
self.rnn = getattr(nn, rnn_type)(ninp, nhid, nlayers, dropout=dropout)
else:
nonlinearity = {'RNN_TANH': 'tanh'.'RNN_RELU': 'relu'}[rnn_type]
self.rnn = nn.RNN(ninp, nhid, nlayers, nonlinearity=nonlinearity, dropout=dropout)
self.decoder = nn.Linear(nhid, ntoken) Decoder member variables
# omit the last part of the code
Copy the code
4.1.2 Distributed Change
Let’s see how the above model can be modified to take into account the distributed nature.
The following code wraps the embedding table and decoder into sub-modules to pass their constructors to the RPC API. In the EmbeddingTable submodule, we intentionally put the embedded layer on the GPU for demonstration purposes. In V1.4, RPC always creates CPU tensor parameters or return values on the target worker process. If the function takes a GPU tensor, it needs to be explicitly moved to the appropriate device.
class EmbeddingTable(nn.Module) :
r""" Encoding layers of the RNNModel """
def __init__(self, ntoken, ninp, dropout) :
super(EmbeddingTable, self).__init__()
self.drop = nn.Dropout(dropout)
self.encoder = nn.Embedding(ntoken, ninp).cuda()
self.encoder.weight.data.uniform_(-0.1.0.1)
def forward(self, input) :
return self.drop(self.encoder(input.cuda()).cpu()
class Decoder(nn.Module) :
def __init__(self, ntoken, nhid, dropout) :
super(Decoder, self).__init__()
self.drop = nn.Dropout(dropout)
self.decoder = nn.Linear(nhid, ntoken)
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-0.1.0.1)
def forward(self, output) :
return self.decoder(self.drop(output))
Copy the code
4.2 the RNN model
As mentioned earlier, to achieve parallel training of distributed models, developers can divide the model into submodules. With the submodules above, we can now combine them using RPC to create an RNN model. We’ll call RPC to create submodule instances remotely and use RRef to find them if necessary. As you can see in the code below, it looks very similar to stand-alone model parallel training. The main difference is that the tensor.to (device) is replaced with RPC.
Ps represents a parameter server that hosts the parameters of the embedded table and decoder. The constructor uses the remote API to create EmbeddingTable objects and decoder objects on the parameter server, and to create LSTM submodules locally.
During forward propagation, the trainer uses the EmbeddingTable RRef to find the remote submodule, and uses RPC to pass input data to the EmbeddingTable and get the lookup results. It then runs the embed through the local LSTM layer, and finally sends the output to the decoder submodule using another RPC.
class RNNModel(nn.Module) :
def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5) :
super(RNNModel, self).__init__()
# setup embedding table remotely
self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
# setup LSTM locally
self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
# setup decoder remotely
self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))
def forward(self, input, hidden) :
# pass input to the remote embedding table and fetch emb tensor back
emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
output, hidden = self.rnn(emb, hidden)
# pass output to the rremote decoder and get the decoded output back
decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
return decoded, hidden
Copy the code
Therefore, the logical diagram is extended as follows:
torch.multiprocessing.spawn
+
|
|
+-----------------+--------------------+
| |
| |
v v
+---------+------------+ +---------------------+-------------------------------------+
|"ps" rank = 0 | | "trainer" rank = 1 |
| | | |
| | | model = rnn.RNNModel('ps') | | | | | | +---------------+ | | +---------------------------------------+ | | |EmbeddingTable | | | | RNNModel | | | | | | | | | | | | | <--------------+ self.emb_table_rref | | | +---------------+ | | | | | | +---------------+ | | | | | | |Decoder | <--------------+ self.decoder_rref | | | | | | | | | | | | | | | | self.rnn = LSTM | | | | | | | | | | | +---------------+ | | +---------------------------------------+ | | | | | | | | | | | | forward() { | | | | emb = _remote_method(EmbeddingTable.forward,input) |
| | | output, hidden = self.rnn(emb, hidden) |
+----------------------+ | decoded = _remote_method(Decoder.forward, output) |
| } |
| |
| |
| dist_autograd.backward(context_id, [loss]) |
| |
| |
| DistributedOptimizer.step(context_id) |
| |
+-----------------------------------------------------------+
Copy the code
4.3 Distributed optimizer
Before introducing the distributed optimizer, let’s add a helper function that generates the RRefs list of model parameters that the distributed optimizer will use. In local training, an application can call module.parameters () to get references to all parameter tensors and pass them to the local optimizer for subsequent updates. However, because some parameters exist on a remote machine, the same API does not work in a distributed training scenario. Therefore, instead of using a “tensor” list of parameters, the distributed optimizer uses a “RRef” list, with one “RRef” for each model parameter of local and remote model parameters. The helper functions are as simple as calling module.parameters () and creating a local ‘RRef’ on each parameter.
def _parameter_rrefs(module) :
param_rrefs = []
for param in module.parameters():
param_rrefs.append(RRef(param))
return param_rrefs
Copy the code
Then, since the RNNModel contains three submodules, we need to call _parameter_rrefs three times and wrap it in another helper function.
class RNNModel(nn.Module) :.def parameter_rrefs(self) :
remote_params = []
# get RRefs of embedding table
remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
# create RRefs for local parameters
remote_params.extend(_parameter_rrefs(self.rnn))
# get RRefs of decoder
remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
return remote_params
Copy the code
In the Trainer, generate a distributed optimizer using the example below, which takes some remote parameters as optimization objects.
# setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model.parameter_rrefs(),
lr=0.05.)Copy the code
Our final expansion is as follows:
- (1) The EMb_TABLE_rref member variable of RNNModel points to the EmbeddingTable on the parameter server.
- (2) RNNModel decoder_rref member variable points to the Decoder on the parameter server.
- (3) RNNModel’s RNN member variable points to the local LSTM.
- The three variables to be optimized inside the DistributedOptimizer point to: 4) parameters to the EmbeddingTable on the parameter server, 5) parameters to the Decoder on the parameter server, and 6) parameters to the local LSTM.
They correspond to the numbers on the graph below.
torch.multiprocessing.spawn
+
|
|
+---------------+--------------------+
| |
| |
v v
+---------+------------+ +---------------------+----------------------------------------+
|"ps" rank = 0 | | "trainer" rank = 1 |
| | | |
| | | model = rnn.RNNModel('ps') |
| | | |
| +---------------+ | | +---------------------------------------+ |
| |EmbeddingTable | | | | RNNModel | |
+--->+ | | | 1 | | |
| | | +<------------+ self.emb_table_rref | +------+ |
| | +---------------+ | | | 3 | |LSTM | 6 |
| | | | | self.rnn +---------------------------->+ +<---+ |
| | +---------------+ | | 2 | | | | | |
| | |Decoder +<------------+ self.decoder_rref | +------+ | |
| | | | | | | | | |
| | | | | | +---------------------------------------+ | |
| | | | | | | |
| | +------+--------+ | | forward() { | |
| | ^ | | emb = _remote_method(EmbeddingTable.forward, input) | |
| | | | | output, hidden = self.rnn(emb, hidden) | |
| | | | | decoded = _remote_method(Decoder.forward, output) | |
| | | | | } | |
| +----------------------+ | | |
| | | dist_autograd.backward(context_id, [loss]) | |
| | | | |
| 5 | 4 | +------------------------------------------------------+ | |
| | | | DistributedOptimizer | | |
| | | | | | |
| | | | remote_optimizers = [ | | |
+-------------------------------------------------------+ optim_rref1, | | |
| | | optim_rref2+------------------+ |
+-------------------------------------------+ optim_rref3 | |
| | | |
| | ] | |
| | step(context_id) | |
| +------------------------------------------------------+ |
+--------------------------------------------------------------+
Copy the code
The mobile phone is as follows:
4.4 comparing
As mentioned earlier, distributed model parallel training looks very similar to stand-alone model parallel training. The main difference is that the tensor.to (device) is replaced with RPC. Let’s replace the parameter server with a GPU. Let’s make a rough comparison. It may not be exact, but you can see the key point of distributed training.
+----------------------+ +-------------------------------------------------------------+
| GPU | | CPU rank = 0 |
| | | |
| | | model = rnn.RNNModel() |
| | | |
| +---------------+ | | +---------------------------------------+ |
| |EmbeddingTable | | | | RNNModel | |
+--->+ | | | 1 | | |
| | | +<------------+ self.emb_table_rref | +------+ |
| | +---------------+ | | | 3 | |LSTM | 6 |
| | | | | self.rnn +--------------------------->+ +<---+ |
| | +---------------+ | | 2 | | | | | |
| | |Decoder +<------------+ self.decoder_rref | +------+ | |
| | | | | | | | | |
| | | | | | +---------------------------------------+ | |
| | | | | | | |
| | +------+--------+ | | forward() { | |
| | ^ | | emb = EmbeddingTable.forward(input) | |
| | | | | output, hidden = self.rnn(emb, hidden) | |
| | | | | decoded = Decoder.forward(output) | |
| | | | | } | |
| +----------------------+ | | |
| | | loss.backward() | |
| | | | |
| 5 | 4 | +----------------------------------------+ | |
| | | | Optimizer | | |
| | | | | | |
| | | | param_groups = [ | | |
+-------------------------------------------------------+ optim_rref1, | | |
| | | | | |
| | | optim_rref2+-----------------+ |
| | | | |
+-------------------------------------------+ optim_rref3 | |
| | ] | |
| | step() | |
| | | |
| +----------------------------------------+ |
+-------------------------------------------------------------+
Copy the code
The mobile phone is as follows:
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
0 XFF reference
GETTING STARTED WITH DISTRIBUTED RPC FRAMEWORK