0 x00 the

In the previous article, we introduced the overall architecture of PipeDream, the Profile phase, the compute partitioning phase, and the model transformation phase. In this article, we introduce the runtime execution engine, which is a unified infrastructure layer.

Pipelining parallelism other articles are linked below:

Deep learning pipeline parallel Gpipe(1)– pipeline basic implementation

Deep learning pipeline parallel GPipe (2) —– gradient accumulation

Deep learning pipeline parallel PipeDream(1)– Profile stage

Deep learning pipeline parallel PipeDream(2)– computing partitions

Deep learning pipeline parallel PipeDream(3)– transformation model

0 x01 preface

1.1 Review

In the previous section, we went through three stages: profile, computed partition, model transformation, and so far we have several Python files & configuration files. Once PipeDream loads these files, it is ready to train.

So starting with this article, we introduce the various supporting systems needed for training, such as the runtime execution engine. The main idea is to look at what features should be included in a deep learning training run.

1.2 Runtime system

With the previous analysis in mind, let’s consider why a runtime should be implemented and what features need to be implemented for deep learning (pipeline parallelism).

1.2.1 Features of PyTorch

First, let’s look at PyTorch’s features:

  • PyTorch itself implements Autograd functionality so that backpropagation is automatically differentiated.
  • PyTorch implements DDP functionality on a distributed data parallel implementation.
  • PyTorch also provides RPC functionality as the underpinning for distributed model parallelism. However, RPC functionality was introduced in PyTorch 1.5 at 2020-06-12.
  • PyTorch also implements distributed. Autograd functionality for DDP and RPC, shielding users from many of the details of distributed training. (We’ll have a series on PyTorch’s distribution later.)

1.2.2 PyTorch RPC

RPC functionality was officially introduced in PyTorch 1.5 at 2020-06-12, as shown below.

Distributed RPC framework APIs [Now Stable]

The torch.distributed.rpc package aims at supporting a wide range of distributed training paradigms that do not fit into DistributedDataParallel. Examples include parameter server training, distributed model parallelism, and distributed pipeline parallelism. Features in the torch.distributed.rpc package can be categorized into four main sets of APIs.

  • The RPC API allows running a function on a specified destination worker with given arguments and fetches the return value or creates a distributed reference to the return value.
  • The RRef (Remote REFerence) serves as a reference to an object on another worker. A worker holding an RRef can explicitly request copies of the object, and it can also share the light-weight RRef with other workers without worrying about reference counting. This is especially useful when multiple workers need to repeatedly access different versions of the same remote object.
  • With Distributed Autograd, applications can automatically compute gradients even if a model is split on multiple workers using RPC. This is achieved by stitching together local autograd graphs at RPC boundaries in the forward pass and reaching out to participants to transparently launch local autograd in the backward pass.
  • The Distributed Optimizer uses gradients computed by Distributed Autograd to update model parameters. Its constructor takes a local optimizer (e.g., SGD.Adagrad, etc.) and a list of parameter RRefs, and its step() function automatically uses the local optimizer to update parameters on all distinct RRef owner workers.

However, the PipeDream paper was released in 2019, which meant that PipeDream could not make precise use of PyTorch RPC and had to implement its own communication logic, namely the support for computational graphs.

1.2.3 Features of PipeDream

Secondly, look at the features of PipeDream:

  • PipeDream combines model parallelism and data parallelism to realize pipeline parallelism.
  • PipeDream actually splits a complete deep training model, with each submodel (subgraph) placed on different nodes.

1.2.4 Combination

Combined with the above two points, this means that, for PipeDream, DDP, model parallelism and Autograd functions alone cannot meet our needs and must be used together.

PipeDream needs to implement itself at least:

  • How to communicate between multiple stages (nodes), this might use PyTorch RPC, but since there was no stable version at the beginning, you had to implement a distributed computing graph yourself, thus using PyTorch Distributed’s P2P functionality.
  • Because of communication needs, we manage the sending and receiving of each stage (node) by ourselves, which is the producer and consumer of configuration and management of each stage (node). This also means finding inputs and outputs for each stage (node).
  • Because p2p communication is required, each tensor needs to be configured with a unique identifier (corresponding to the tag below).
  • How to parallelize data on a single stage (several nodes) should use the PyTorch DDP functionality.
  • Because data parallelism is used, you need to manage the number of parallelism at each stage yourself.
  • Because you need to combine model parallelism with data parallelism, you need to manage the process workgroup yourself.
  • Because the training script is run on different nodes (machines), each machine needs to independently configure its own training job when running the training script.

Therefore, we combine these function points to make specific analysis.

0 x02 use

2.1 How to Call

The main_with_runtime.py script can be run on multiple nodes with different startup parameters, such as rank, so that different stages of the model can be run on each node.

python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 0 --local_rank 0 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 1 --local_rank 1 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 2 --local_rank 2 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 3 --local_rank 3 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
Copy the code

The overall code is as follows:

  • Parsing input parameters

  • Load and generate the model

  • Build models based on modules

  • Configure parameters such as input size, Batch size, etc

  • Traverse each layer of the model (skip the last Loss layer)

    • The input tensor is constructed by iterating through each layer of input
    • Build the output by calling the forward function corresponding to the stage
    • Iterate over the output of each layer, setting its type and shape
  • Build the output value tensor type

  • Loading a Configuration File

  • Build a StageRuntime

  • Establish optimizer

  • Loading the dataset

  • Train, save checkpoint

The general logic for using Runtime is as follows: runtime/translation/main_with_runtime.py. The main logic is:

2.2 Overall Logic

The general logic for using Runtime is as follows: runtime/translation/main_with_runtime.py. The main logic is:

  • Parsing input parameters

  • Load and generate the model

  • Build models based on modules

  • Configure parameters such as input size, Batch size, etc

  • Traverse each layer of the model (skip the last Loss layer)

    • The input tensor is constructed by iterating through each layer of input
    • Build the output by calling the forward function corresponding to the stage
    • Iterate over the output of each layer, setting its type and shape
  • Build the output value tensor type

  • Loading a Configuration File

  • Build a StageRuntime

  • Establish optimizer

  • Loading the dataset

  • Train, save checkpoint

The overall code is as follows:

def main():
    # 解析输入参数
    global args, best_prec1
    args = parser.parse_args()
​
    # Special case handling for GNMT model
    l2_promote()
​
    torch.cuda.set_device(args.local_rank)
​
    # build tokenizer
    tokenizer = Tokenizer(os.path.join(args.data_dir, config.VOCAB_FNAME))
​
    # define loss function
    criterion = build_gnmt_criterion(
        vocab_size=tokenizer.vocab_size, padding_idx=config.PAD, smoothing=0.1)
​
    # create stages of the model
    # 加载,生成模型
    module = importlib.import_module(args.module)
    args.arch = module.arch()
    # 依据模块来构建模型
    model = module.model(criterion)
​
    # 依据参数进行配置比如输入大小,batch size等
    input_size = [args.max_length_train, args.batch_size]
    training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],
                              "input2": input_size, "target": [args.max_length_train * args.batch_size],
                              "target_length": [args.batch_size]}
    dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,
              "target": torch.int64, "target_length": torch.int32}
    inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}
    target_tensor_names = {"target", "target_length"}
    
    # 遍历模型的每个层(跳过最后loss层)
    for module_id, (stage, inputs, outputs) in enumerate(model[:-1]):  # Skip last layer (loss).
        input_tensors = []
        # 遍历每层的输入,构建输入张量
        for module_input in inputs:
            if module_input in inputs_module_destinations:
                inputs_module_destinations[module_input] = module_id
​
            input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),
                                      dtype=dtypes[module_input])#.cuda()
            input_tensors.append(input_tensor)
        #stage.cuda()
        # PyTorch should not maintain metadata for a backward pass on
        # synthetic inputs. Without the following line, the runtime is
        # as much as 1.5x slower in a full DP configuration.
        with torch.no_grad():
            # 通过调用stage对应的forward函数,构建出输出
            output_tensors = stage(*tuple(input_tensors))
        if not type(output_tensors) is tuple:
            output_tensors = [output_tensors]
        # 遍历每层的输出,设置其类型和形状    
        for output, output_tensor in zip(outputs,
                                         list(output_tensors)):
            # output 是 ['out2', 'out1']
            training_tensor_shapes[output] = list(output_tensor.size())
            dtypes[output] = output_tensor.dtype
​
    # 构建输出值张量类型           
    eval_tensor_shapes = {}
    for key in training_tensor_shapes:
        eval_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])
        training_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])
​
    # 加载配置文件
    configuration_maps = {
        'module_to_stage_map': None,
        'stage_to_rank_map': None,
        'stage_to_depth_map': None
    }
    if args.config_path is not None:
        json_config_file = json.load(open(args.config_path, 'r'))
        configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
        configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
        configuration_maps['stage_to_rank_map'] = {
            int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
        configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)
​
    # 构建一个 StageRuntime
    r = runtime.StageRuntime(
        model=model, distributed_backend=args.distributed_backend,
        fp16=args.fp16, loss_scale=args.loss_scale,
        training_tensor_shapes=training_tensor_shapes,
        eval_tensor_shapes=eval_tensor_shapes,
        training_tensor_dtypes=dtypes,
        inputs_module_destinations=inputs_module_destinations,
        target_tensor_names=target_tensor_names,
        configuration_maps=configuration_maps,
        master_addr=args.master_addr,
        rank=args.rank, local_rank=args.local_rank,
        num_ranks_in_server=args.num_ranks_in_server,
        verbose_freq=args.verbose_frequency,
        model_type=runtime.TRANSLATION,
        enable_recompute=args.recompute)
​
    # stage needed to determine if current stage is the first stage
    # num_stages needed to determine if current stage is the last stage
    # num_ranks needed to determine number of warmup_minibatches in case of pipelining
    args.stage = r.stage
    args.num_stages = r.num_stages
    args.num_ranks = r.num_ranks
    if not is_first_stage():
        args.synthetic_data = True
​
    # define optimizer
    if args.no_input_pipelining:
        num_versions = 1
    else:
        # number of versions is the total number of machines following the current
        # stage, shared amongst all replicas in this stage
        num_versions = r.num_warmup_minibatches + 1
​
    # if specified, resume from checkpoint
    if args.resume:
        checkpoint_file_path = "%s.%d.pth.tar" % (args.resume, r.stage)
        assert os.path.isfile(checkpoint_file_path)
        print("=> loading checkpoint '{}'".format(checkpoint_file_path))
        checkpoint = torch.load(checkpoint_file_path)
        args.start_epoch = checkpoint['epoch']
        best_prec1 = checkpoint['best_prec1']
        r.load_state_dict(checkpoint['state_dict'])
        print("=> loaded checkpoint '{}' (epoch {})"
                .format(checkpoint_file_path, checkpoint['epoch']))
​
    # TODO: make this configurable by args
    # 建立 optimizer
    use_adam_optimizer = True
    if use_adam_optimizer:
        optimizer = adam.AdamWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
            macrobatch=args.macrobatch)
    else:
        optimizer = sgd.SGDWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, momentum=args.momentum,
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)
​
    if args.resume:
        optimizer.load_state_dict(checkpoint['optimizer'])
​
    cudnn.benchmark = True
​
    # 加载 dataset
    train_dataset = LazyParallelDataset(
        src_fname=os.path.join(args.data_dir, config.SRC_TRAIN_FNAME),
        tgt_fname=os.path.join(args.data_dir, config.TGT_TRAIN_FNAME),
        tokenizer=tokenizer,
        min_len=args.min_length_train,
        max_len=args.max_length_train,
        sort=False,
        max_size=None)
​
    val_dataset = ParallelDataset(
        src_fname=os.path.join(args.data_dir, config.SRC_VAL_FNAME),
        tgt_fname=os.path.join(args.data_dir, config.TGT_VAL_FNAME),
        tokenizer=tokenizer,
        min_len=args.min_length_train,
        max_len=args.max_length_train,
        sort=True)
​
    distributed_sampler = False
    if configuration_maps['stage_to_rank_map'] is not None:
        num_ranks_in_first_stage = len(configuration_maps['stage_to_rank_map'][0])
        if num_ranks_in_first_stage > 1:
            distributed_sampler = True
​
    # TODO: fix random seeds
    train_loader = train_dataset.get_loader(
        batch_size=args.batch_size, seeds=range(args.epochs),
        batch_first=False, shuffle=True,
        bucketing=not args.no_bucketing, num_workers=args.workers,
        world_size=r.num_ranks_in_first_stage,
        rank=r.rank_in_stage if r.stage == 0 else 0
    )
​
    val_loader = val_dataset.get_loader(
        batch_size=args.batch_size, batch_first=False,
        shuffle=True, num_workers=args.workers,
        world_size=r.num_ranks_in_first_stage,
        seeds=range(args.epochs),
        rank=r.rank_in_stage if r.stage == 0 else 0
    )
​
    # if checkpoint is loaded, start by running validation
    if args.resume:
        assert args.start_epoch > 0
        validate(val_loader, r, args.start_epoch-1)
​
    # 进行训练,保存checkpoint
    for epoch in range(args.start_epoch, args.epochs):
        if distributed_sampler:
            train_loader.sampler.set_epoch(epoch)
        adjust_learning_rate(optimizer, epoch, args.epochs, r, args.lr_policy)
​
        # train or run forward pass only for one epoch
        if args.forward_only:
            validate(val_loader, r, epoch)
        else:
            train(train_loader, r, optimizer, epoch)
​
            # evaluate on validation set
            prec1 = validate(val_loader, r, epoch)
            if r.stage != r.num_stages: prec1 = 0
​
            # remember best prec@1 and save checkpoint
            best_prec1 = max(prec1, best_prec1)
​
            should_save_checkpoint = args.checkpoint_dir_not_nfs or r.rank_in_stage == 0
            if args.checkpoint_dir and should_save_checkpoint:
                save_checkpoint({
                    'epoch': epoch + 1,
                    'arch': args.arch,
                    'state_dict': r.state_dict(),
                    'best_prec1': best_prec1,
                    'optimizer' : optimizer.state_dict(),
                    'tokenizer': tokenizer.get_state()
                }, args.checkpoint_dir, r.stage, epoch)
Copy the code

3 Loading model

Let’s first look at how to load the model.

3.1 Model File

The model file was generated above, so here to load the model file, we start with.. / translation/models/GNMT/gpus = 4 / under the model file, for example.

The __init__ file here looks like this:

from .gnmt import GNMTSplit 
from .stage0 import Stage0
from .stage1 import Stage1
from .stage2 import Stage2
from .stage3 import Stage3
​
def arch():
    return "gnmt"
​
def model(criterion):
    return [
        (Stage0(), ["input0", "input1"], ["out2", "out1"]),
        (Stage1(), ["out2", "input1", "input2", "out1"], ["out3", "out7"]),
        (Stage2(), ["out3", "out7"], ["out8", "out9", "out10"]),
        (Stage3(), ["out8", "out9", "out10"], ["out12"]),
        (criterion, ["out12"], ["loss"])
    ]
​
def full_model():
    return GNMTSplit()
Copy the code

The format of each item is as follows:

(stage, inputs, outputs)
Copy the code

So you need to load it in this format.

3.2 loading

The specific loading method is as follows:

# create stages of the model
module = importlib.import_module(args.module)
args.arch = module.arch()
Copy the code
module = {module} <module 'translation.models.gnmt.gpus=4' from '.. /translation/models/gnmt/gpus=4/__init__.py'> GNMTSplit = {type} <class 'translation.models.gnmt.gpus=4.gnmt.GNMTSplit'>  Stage0 = {type} <class 'translation.models.gnmt.gpus=4.stage0.Stage0'> Stage1 = {type} <class 'translation.models.gnmt.gpus=4.stage1.Stage1'> Stage2 = {type} <class 'translation.models.gnmt.gpus=4.stage2.Stage2'> Stage3 = {type} <class 'translation.models.gnmt.gpus=4.stage3.Stage3'> gnmt = {module} <module 'translation.models.gnmt.gpus=4.gnmt' from '.. /translation/models/gnmt/gpus=4/gnmt.py'> stage0 = {module} <module 'translation.models.gnmt.gpus=4.stage0' from '.. /translation/models/gnmt/gpus=4/stage0.py'> stage1 = {module} <module 'translation.models.gnmt.gpus=4.stage1' from '.. /translation/models/gnmt/gpus=4/stage1.py'> stage2 = {module} <module 'translation.models.gnmt.gpus=4.stage2' from '.. /translation/models/gnmt/gpus=4/stage2.py'> stage3 = {module} <module 'translation.models.gnmt.gpus=4.stage3' from '.. /translation/models/gnmt/gpus=4/stage3.py'>Copy the code

The resulting module looks like this:

3.3 Model Building

The next step is to build the model in terms of modules.

model = module.model(criterion)
Copy the code

Criterion here is LabelSmoothing().

In the Model (Criterion) call, Stage0() ~ Stage3() is called one by one to build each layer.

For example, Stage3 calls the __init__ function.

class Stage3(torch.nn.Module):
    def __init__(self):
        super(Stage3, self).__init__()
        self.layer5 = torch.nn.LSTM(2048, 1024)
        self.layer8 = Classifier(1024, 32320)
Copy the code

Model is obtained, as follows.

model = {list: 5} 0 = {tuple: 3} 0 = {Stage0} Stage0(\n (layer4): Embedding(32320, 1024, padding_idx=0)\n (layer5): EmuBidirLSTM(\n (bidir): LSTM(1024, 1024, bidirectional=True)\n (layer1): LSTM(1024, 1024)\n (layer2): LSTM(1024, 1024)\ N)\ N (Layer6): Dropout(p=0.2, inplace=False)\ N (Layer7): LSTM(2048, 1024)\ N (Layer9): Dropout(p=0.2, inplace=False)\ N (Layer7): LSTM(2048, 1024)\ N (Layer9): Dropout(p=0.2, inplace=False)\n) 1 = {list: 2} ['input0', 'input1'] 2 = {list: 2} ['out2', 'out1'] __len__ = {int} 3 1 = {tuple: 3} 0 = {Stage1} Stage1(\n (layer6): LSTM(1024, 1024)\n (layer9): Embedding(32320, 1024, padding_idx=0)\ N (Layer11): Dropout(p=0.2, inplace=False)\n (layer12): LSTM(1024, 1024)\n (layer15): RecurrentAttention(\n (rnn): LSTM(1024, 1024)\n (attn): BahdanauAttention(\n (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n (dropout): Dropout(p=0, inplace=False)\n )\n (dropout): Dropout(p=0, inplace=False)\n )\n) 1 = {list: 4} ['out2', 'input1', 'input2', 'out1'] 2 = {list: 2} ['out3', 'out7'] __len__ = {int} 3 2 = {tuple: 3} 0 = {Stage2} Stage2(\n (layer7): Dropout(p=0.2, inplace=False)\ N (Layer9): LSTM(2048, 1024)\ N (Layer11): Dropout(p=0.2, inplace=False)\ N (Layer13): Dropout(p=0.2, inplace=False)\ N (Layer13): Dropout(p=0.2, inplace=False)\ N (Layer13): LSTM(2048, 1024)\ N (Layer16): Dropout(p=0.2, inplace=False)\n) 1 = {list: 2} ['out3', 'out7'] 2 = {list: 3} ['out8', 'out9', 'out10'] __len__ = {int} 3 3 = {tuple: 3} 0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n) 1 = {list: 3} ['out8', 'out9', 'out10'] 2 = {list: 1} ['out12'] __len__ = {int} 3 4 = {tuple: 3} (LabelSmoothing(), ['out12'], ['loss']) 0 = {LabelSmoothing} LabelSmoothing() 1 = {list: 1} ['out12'] 2 = {list: 1} ['loss'] __len__ = {int} 3 __len__ = {int} 5Copy the code

3.4 INPUT and Output

Once the model is loaded, set the inputs and outputs as follows:

  • Configure the parameters based on the parameters

  • Iterate through each layer of the model (skipping the last Loss layer) to do the following:

    • The input tensor is constructed by iterating through each layer of input.
    • Build the output by calling the forward function corresponding to the stage.
    • Iterate through the output of each layer, setting the type.
    • Build the tensor shape.

Note that the format of each sub-module is as follows:

(Stage0 (), [" input0 ", "input1"], # input [" out2 ", "out1"] # output)Copy the code

The code comments are as follows:

Input_size = [args. Max_length_train, args. Batch_size] training_tensor_SHAPES = {"input0": input_size, "input1": [args.batch_size], "input2": input_size, "target": [args.max_length_train * args.batch_size], "target_length": [args.batch_size]} dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64, "target": torch.int64, "target_length": torch.int32} inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0} target_tensor_names = {"target", "target_length"} # Loop through each layer of the model (skip the last loss layer) for module_id, (stage, inputs, outputs) in enumerate(model[:-1]): • Input_tensors = [] • Input tensor for inputs: if module_input in inputs_module_destinations: inputs_module_destinations[module_input] = module_id input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]), dtype=dtypes[module_input]).cuda() input_tensors.append(input_tensor) stage.cuda() # PyTorch should not maintain metadata for a backward pass on # synthetic inputs. Without the following line, The Runtime is # as much as 1.5x slower in a full DP configuration. With torch. No_grad (): Output_tensors = stage(*tuple(sors)) if not type(output_tensors) is tuple: Output_tensors = [output_tensors] # apply the tensor in zip(outputs, list(output_tensors)): # the output is [' out2 ', 'out1'] training_tensor_shapes[output] = list(output_tensor.size()) dtypes[output] = output_tensor.dtype # build output value tensor types eval_tensor_shapes = {} for key in training_tensor_shapes: eval_tensor_shapes[key] = tuple( training_tensor_shapes[key]) training_tensor_shapes[key] = tuple( training_tensor_shapes[key])Copy the code

Get the shape and type of the output.

dtypes = {dict: 13} 'input0' = {dtype} torch.int64 'input1' = {dtype} torch.int64 'input2' = {dtype} torch.int64 'target' = {dtype} torch.int64 'target_length' = {dtype} torch.int32 'out2' = {dtype} torch.float32 'out1' = {dtype} torch.float32 'out3' =  {dtype} torch.float32 'out7' = {dtype} torch.float32 'out8' = {dtype} torch.float32 'out9' = {dtype} torch.float32 'out10' = {dtype} torch.float32 'out12' = {dtype} torch.float32 __len__ = {int} 13 training_tensor_shapes = {dict: 13} 'input0' = {tuple: 2} (50, 128) 'input1' = {tuple: 1} 128 'input2' = {tuple: 2} (50, 128) 'target' = {tuple: 1} 6400 'target_length' = {tuple: 1} 128 'out2' = {tuple: 3} (50, 128, 1024) 'out1' = {tuple: 3} (50, 128, 1024) 'out3' = {tuple: 3} (50, 128, 1024) 'out7' = {tuple: 3} (50, 128, 1024) 'out8' = {tuple: 3} (50, 128, 1024) 'out9' = {tuple: 3} (50, 128, 1024) 'out10' = {tuple: 3} (50, 128, 1024) 'out12' = {tuple: 3} (50, 128, 32320) __len__ = {int} 13 eval_tensor_shapes = {dict: 13} { 'input0' = {tuple: 2} (50, 128) 'input1' = {tuple: 1} 128 'input2' = {tuple: 2} (50, 128) 'target' = {tuple: 1} 6400 'target_length' = {tuple: 1} 128 'out2' = {tuple: 3} (50, 128, 1024) 'out1' = {tuple: 3} (50, 128, 1024) 'out3' = {tuple: 3} (50, 128, 1024) 'out7' = {tuple: 3} (50, 128, 1024) 'out8' = {tuple: 3} (50, 128, 1024) 'out9' = {tuple: 3} (50, 128, 1024) 'out10' = {tuple: 3} (50, 128, 1024) 'out12' = {tuple: 3} (50, 128, 32320) __len__ = {int} 13Copy the code

3.5 configuration

Load the configuration file generated above.

configuration_maps = {
    'module_to_stage_map': None,
    'stage_to_rank_map': None,
    'stage_to_depth_map': None
}
if args.config_path is not None:
    json_config_file = json.load(open(args.config_path, 'r'))
    configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
    configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
    configuration_maps['stage_to_rank_map'] = {
        int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
    configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)
Copy the code

The corresponding file is:

{
    "module_to_stage_map": [0, 1, 2, 3, 3],
    "stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}
Copy the code

Get:

configuration_maps = {dict: 3} 
 'module_to_stage_map' = {list: 5} [0, 1, 2, 3, 3]
 'stage_to_rank_map' = {dict: 4} {0: [0], 1: [1], 2: [2], 3: [3]}
 'stage_to_depth_map' = {NoneType} None
 __len__ = {int} 3
Copy the code

0x04 runtime

To illustrate, we start main_with_runtime.py with the following parameters.

--module translation.models.gnmt.gpus=4 --data_dir=wmt16_ende_data_bpe_clean --config_path Pipedream is - pipedream is/runtime/translation/models/GNMT/gpus = 4 / mp_conf json - local_rank 3 - rank 3 - master_addr 127.0.0.1Copy the code

The Runtime is built in the main function as follows. The Runtime is the execution engine that provides a unified, extensible infrastructure layer.

r = runtime.StageRuntime(
    model=model, distributed_backend=args.distributed_backend,
    fp16=args.fp16, loss_scale=args.loss_scale,
    training_tensor_shapes=training_tensor_shapes,
    eval_tensor_shapes=eval_tensor_shapes,
    training_tensor_dtypes=dtypes,
    inputs_module_destinations=inputs_module_destinations,
    target_tensor_names=target_tensor_names,
    configuration_maps=configuration_maps,
    master_addr=args.master_addr,
    rank=args.rank, local_rank=args.local_rank,
    num_ranks_in_server=args.num_ranks_in_server,
    verbose_freq=args.verbose_frequency,
    model_type=runtime.TRANSLATION,
    enable_recompute=args.recompute)
Copy the code

4.1 StageRuntime

StageRuntime is defined as follows. As you can see, its main member variables are metadata required for forward and backward operations within the stage, such as:

Tensors, gradients, distributed back ends, Loss Scale, tensor types of training data, output value tensor shapes, etc.

class StageRuntime: def __init__(self, model, distributed_backend, fp16, loss_scale, training_tensor_shapes, eval_tensor_shapes, training_tensor_dtypes, inputs_module_destinations, target_tensor_names, configuration_maps, master_addr, rank, local_rank, num_ranks_in_server, verbose_freq, model_type, enable_recompute=False): # Metadata needed for forward and backward pass within this stage. self.tensors = [] self.gradients = {} self.distributed_backend = distributed_backend self.fp16 = fp16 self.loss_scale = loss_scale self.training_tensor_shapes  = training_tensor_shapes self.eval_tensor_shapes = eval_tensor_shapes self.training_tensor_dtypes = training_tensor_dtypes self.model_type = model_type self.target_tensor_names = target_tensor_names self.initialize(model, inputs_module_destinations, configuration_maps, master_addr, rank, local_rank, num_ranks_in_server) self.verbose_freq = verbose_freq self.forward_only = False self.forward_stats = runtime_utilities.RuntimeStats(forward=True) self.backward_stats = runtime_utilities.RuntimeStats(forward=False) # Enable recomputation to prevent the need to save activations # computed from the forward pass for the backward pass. self.enable_recompute = enable_recompute # Disable recomputation for the last stage. if rank == num_ranks_in_server - 1:  self.enable_recompute = FalseCopy the code

The initialization function code is long, so let’s analyze it piece by piece.

4.2 the initialization

2 set the tag

At the beginning of the function, we iterate through the inputs and outputs at each level of the model, and set tensor_tag, which means we’ll give each tensor a unique tag. Tag is passed through, and we’ll end up using recv in distributed_c10d.py. Tensor_tag is used during communication and is used by P2P as a identifying identifier.

def recv(tensor,
         src=None,
         group=None,
         tag=0):
    """
    Receives a tensor synchronously.
​
    Args:
        tensor (Tensor): Tensor to fill with received data.
        src (int, optional): Source rank. Will receive from any
            process if unspecified.
        group (ProcessGroup, optional): The process group to work on. If None,
            the default process group will be used.
        tag (int, optional): Tag to match recv with remote send
​
    Returns:
        Sender rank
        -1, if not part of the group
​
    """
    _check_single_tensor(tensor, "tensor")
    if _rank_not_in_group(group):
        return -1
​
    if group is None:
        pg = _get_default_group()
    else:
        pg = group
​
    if src is None:
        work = pg.recv_anysource([tensor], tag)
        work.wait()
        src_rank = work._source_rank()
        if group is None or group is GroupMember.WORLD:
            return src_rank
        else:
            return _get_global_rank(pg, src_rank)
    else:
        if group is None or group is GroupMember.WORLD:
            pg.recv([tensor], src, tag).wait()
        else:
            group_src_rank = _get_group_rank(pg, src)
            pg.recv([tensor], group_src_rank, tag).wait()
        return src
Copy the code

The code for setting the tag is as follows:

def initialize(self, model, inputs_module_destinations, configuration_maps, master_addr, rank, local_rank, num_ranks_in_server): self.send_ranks = {} self.receive_ranks = {} self.rank = rank self.local_rank = local_rank self.stage = None self.tensor_tags = {} self.forward_minibatch_id = 0 self.backward_minibatch_id = 0 self.criterion_input_name = STR (model[-1][1][0]) tensor_tag = 1 # The format of each layer is (_, input_tensors, output_tensors) for (_, input_tensors, output_tensors) in model: For input_tensor in input_tensors: If input_tensor not in self. Tensor_tags: Self. tensor_tags[input_tensor] = tensor_tag tensor_tag += 1 # for output_tensor in output_tensors: if output_tensor not in self.tensor_tags: Self. tensor_tags[output_tensor] = tensor_tag tensor_tag += 1 # set tag for target_tensor_name in sorted(self.target_tensor_names): Self. tensor_tags[target_tensor_name] = tensor_tag tensor_tag += 1 # Set tag self.tensor_tags["ack"] = tensor_tag Tensor_tag += 1 # set tagCopy the code

Input is:

target_tensor_names = {set: 2} {'target_length', 'target'} {str} 'target_length' {str} 'target' __len__ = {int} 2 model = {list: 5} 0 = {Stage0} Stage0(\n (layer4): Embedding(32320, 1024, padding_idx=0)\n (layer5): EmuBidirLSTM(\n (bidir): LSTM(1024, 1024, bidirectional=True)\n (layer1): LSTM(1024, 1024)\n (layer2): LSTM(1024, 1024)\n )\n (layer6): Dropout(p=0.2, inplace=False)\ N (layer7): LS 1 = {list: 2} ['input0', 'input1'] 2 = {list: 2} ['out2', 'out1'] __len__ = {int} 3 1 = {tuple: 3} 0 = {Stage1} Stage1(\n (layer6): LSTM(1024, 1024)\n (layer9): Embedding(32320, 1024, padding_idx=0)\ N (Layer11): Dropout(p=0.2, inplace=False)\n (layer12): LSTM(1024, 1024)\n (layer15): RecurrentAttention(\n (rnn): LSTM(1024, 1024)\n (attn): BahdanauAttention(\n (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n (dropout): Dropout(p=0, inplace=False)\n )\n (dropout): Dropout(p=0, inplace=False)\n )\n) 1 = {list: 4} ['out2', 'input1', 'input2', 'out1'] 2 = {list: 2} ['out3', 'out7'] __len__ = {int} 3 2 = {tuple: 3} 0 = {Stage2} Stage2(\n (layer7): Dropout(p=0.2, inplace=False)\ N (Layer9): LSTM(2048, 1024)\ N (Layer11): Dropout(p=0.2, inplace=False)\ N (Layer13): Dropout(p=0.2, inplace=False)\ N (Layer13): Dropout(p=0.2, inplace=False)\ N (Layer13): LSTM(2048, 1024)\ N (Layer16): Dropout(p=0.2, inplace=False)\n) 1 = {list: 2} ['out3', 'out7'] 2 = {list: 3} ['out8', 'out9', 'out10'] __len__ = {int} 3 3 = {tuple: 3} 0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n) 1 = {list: 3} ['out8', 'out9', 'out10'] 2 = {list: 1} ['out12'] __len__ = {int} 3 4 = {tuple: 3} 0 = {LabelSmoothing} LabelSmoothing() 1 = {list: 1} ['out12'] 2 = {list: 1} ['loss'] __len__ = {int} 3 __len__ = {int} 5Copy the code

Get:

tensor_tags = {dict: 15} 'input0' = {int} 1 'input1' = {int} 2 'out2' = {int} 3 'out1' = {int} 4 'input2' = {int} 5 'out3' = {int} 6 'out7' =  {int} 7 'out8' = {int} 8 'out9' = {int} 9 'out10' = {int} 10 'out12' = {int} 11 'loss' = {int} 12 'target' = {int} 13 'target_length' = {int} 14 'ack' = {int} 15 __len__ = {int} 15Copy the code

4.2.2 configuration map

Recall some of the definitions in the configuration file:

  • Module_to_stage_map is what stages this model is divided into.
  • Stage_to_rank_map refers to the rank corresponding to each stage, and the rank represents the specific worker process. For example, this stage is data parallel by several ranks.

Here is a sample file:

{ "module_to_stage_map": [0, 1, 2, 2], "stage_to_rank_map": {"0": [0, 1, 4, 5, 8, 9, 12, 13], "1": [2, 6, 10, 14], "2": [3, 7, 11, 15]}}Copy the code

For our model in this article, the configuration file is as follows:

{
    "module_to_stage_map": [0, 1, 2, 3, 3],
    "stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}
Copy the code

Loading into memory is:

module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]
rank_to_stage_map = {dict: 4} {0: 0, 1: 1, 2: 2, 3: 3}
Copy the code

Because sometimes the reverse lookup is also required, the program then does the reverse configuration, resulting in the following.

stage_to_module_map = {defaultdict: 4}
 default_factory = {type} <class 'list'>
 0 = {list: 1} [0]
 1 = {list: 1} [1]
 2 = {list: 1} [2]
 3 = {list: 2} [3, 4]
 __len__ = {int} 4
stage_to_rank_map = {dict: 4} 
 0 = {list: 1} [0]
 1 = {list: 1} [1]
 2 = {list: 1} [2]
 3 = {list: 1} [3]
 __len__ = {int} 4
Copy the code

4.2.3 Finding your Own configuration

Since local_rank and rank are set locally on the command line, the Runtime then finds its own stuff from the rank in the configuration file and further configures itself.

stage_to_module_map = collections.defaultdict(list) for module in range(len(module_to_stage_map)): Stage_to_stage_map [module_to_stage_map[module]].append(module) rank_to_stage_map = {} for stage in stage_to_rank_map: for rank in stage_to_rank_map[stage]: [rank] = stage # Now, use this mapping to determine the modules contained in # each stage. assert 0 <= self.rank < len(rank_to_stage_map) Ranks = len(rank_to_stage_map) # ranks = len(rank_to_stage_map) # Self.stage = rank_to_stage_map[self.rank] # Rank = stage_to_rank_map[self.stage].index(self.rank) # rank = stage Self.num_ranks_in_stage = len(stage_to_rank_map[self.stage])# Self.num_ranks_in_first_stage = len(stage_to_rank_map[0]) self.num_ranks_in_previous_stage = 0 self.ranks_in_previous_stage = [] if self.stage > 0: self.num_ranks_in_previous_stage = len( stage_to_rank_map[self.stage - 1]) self.ranks_in_previous_stage = stage_to_rank_map[self.stage - 1] self.num_ranks_in_next_stage = 0 self.ranks_in_next_stage = [] if self.stage < self.num_stages - 1: self.num_ranks_in_next_stage = len( stage_to_rank_map[self.stage + 1]) self.ranks_in_next_stage = Stage_to_rank_map [self.stage + 1] modules = stage_to_module_map[self.stage] # self.modules_with_dependencies = ModulesWithDependencies( [model[module] for module in modules]) self.is_criterion = self.stage == (self.num_stages - 1) if stage_to_depth_map is not None: self.num_warmup_minibatches = stage_to_depth_map[ str(self.stage)] else: self.num_warmup_minibatches = self.num_ranks - 1 for i in range(self.stage): self.num_warmup_minibatches -= len( stage_to_rank_map[i]) self.num_warmup_minibatches = self.num_warmup_minibatches // \  self.num_ranks_in_stageCopy the code

Variables are as follows:

self = {StageRuntime} backward_minibatch_id = {int} 0 criterion_input_name = {str} 'out12' distributed_backend = {NoneType} None eval_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), ' forward_minibatch_id = {int} 0 fp16 = {bool} False gradients = {dict: 0} {} is_criterion = {bool} True local_rank = {int} 3 loss_scale = {int} 1 model_type = {str} 'translation' modules_with_dependencies = {ModulesWithDependencies} _all_input_names = {list: 2} [['out8', 'out9', 'out10'], ['out12']] _all_output_names = {list: 2} [['out12'], ['loss']] _modules = {list: 2} 0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n) 1 = {LabelSmoothing} LabelSmoothing() __len__ = {int} 2 num_ranks = {int} 4 num_ranks_in_first_stage =  {int} 1 num_ranks_in_next_stage = {int} 0 num_ranks_in_previous_stage = {int} 1 num_ranks_in_stage = {int} 1 num_stages  = {int} 4 num_warmup_minibatches = {int} 0 rank = {int} 3 rank_in_stage = {int} 0 ranks_in_next_stage = {list: 0} [] ranks_in_previous_stage = {list: 1} [2] receive_ranks = {dict: 0} {} send_ranks = {dict: 0} {} stage = {int} 3 target = {str} 'python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 178, in _getPyDictionary\n attr = getattr(var, n)\n File ".. /runtime.py", line 295, in target\n r target_tensor_names = {set: 2} {'target', 'target_length'} tensor_tags = {dict: 15} {'input0': 1, 'input1': 2, 'out2': 3, 'out1': 4, 'input2': 5, 'out3': 6, 'out7': 7, 'out8': 8, 'out9': 9, 'out10': 10, 'out12': 11, 'loss': 12, 'target': 13, 'target_length': 14, 'ack': 15} tensors = {list: 0} [] training_tensor_dtypes = {dict: 13} {'input0': torch.int64, 'input1': torch.int64, 'input2': torch.int64, 'target': torch.int64, 'target_length': torch.int32, 'out2': torch.float32, 'out1': torch.float32, 'out3': torch.float32, 'out7': torch.float32, 'out8': torch.float32, 'out9': torch.floa training_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), 'Copy the code

Let’s look at how a few variables are used.

4.2.3.1 num_ranks

First, take a look at how NUM_ranks is used. It is used in subsequent code, such as:

Num_warmup_minibatches =self. Num_ranks - 1 Based on num_ranks Get the warm-up batch numberCopy the code
4.2.3.2 rank_in_stage

Second, how is rank_in_stage used?

In front of

Self.rank_in_stage = stage_to_rank_map[self.stage].index(self.rank) #Copy the code

Rank_in_stage is passed to the Comm module.

self.comm_handler.initialize( self.receive_ranks, self.send_ranks, self.tensor_tags, self.target_tensor_names, Self. training_tensor_dtypes, self.rank_in_stage, # is passed here as an argument and represents this object in the function, Self.num_ranks_in_stage, self.ranks_in_previous_stage, self.ranks_in_next_stage)Copy the code

4.2.4 Setting the Communication Module

Next, configure the communication module.

# To determine where tensors should be sent and received, first
# determine the "producing" and "consuming" module IDs of each
# tensor. We then use the corresponding machine ranks to send
# and receive tensors.
master_port = 12345
self.comm_handler = communication.CommunicationHandler(
    master_addr=master_addr,
    master_port=master_port,
    rank=self.rank,
    local_rank=self.local_rank,
    num_ranks_in_server=num_ranks_in_server,
    world_size=self.num_ranks,
    fp16=self.fp16,
    backend=self.distributed_backend)
Copy the code

The configuration code looks like this, building the CommunicationHandler, which is a module for the subsequent “set producer and consumer” service, so we’ll put the subsequent code together here for now.

else: ...... # To determine where tensors should be sent and received, first # determine the "producing" and "consuming" module IDs of each # tensor. We then use the corresponding machine ranks to send # and receive tensors. master_port = 12345 self.comm_handler = communication.CommunicationHandler( master_addr=master_addr, master_port=master_port, rank=self.rank, local_rank=self.local_rank, num_ranks_in_server=num_ranks_in_server, world_size=self.num_ranks, fp16=self.fp16, Backend =self.distributed_backend) # set up the producer and consumer parts, which we will analyze in more more below # set up accept ranks for I in range(len(model)): # for tensor_name in model[I][2]: # for tensor_name in model[I]: Tensor if tensor_name in model[J][1]: If module_to_stage_map[I] == module_to_stage_map[j]: continue # For now, assume that each stage is served by only # a single machine. if module_to_stage_map[j] == self.stage: self.receive_ranks[tensor_name] = \ stage_to_rank_map[module_to_stage_map[i]] if module_to_stage_map[i] == self.stage: Self.send_ranks [tensor_name] = \ stage_to_rank_map[module_to_stage_map[j]] # Set to send ranks for model_inputs in inputs_module_destinations.keys(): destination_stage = module_to_stage_map[ inputs_module_destinations[model_inputs]] if destination_stage > self.stage: self.send_ranks[model_inputs] = \ self.ranks_in_next_stage if 0 < self.stage <= destination_stage: self.receive_ranks[model_inputs] = \ self.ranks_in_previous_stage if destination_stage > 0: if model_inputs not in self.tensor_tags: self.tensor_tags[model_inputs] = tensor_tag tensor_tag += 1Copy the code

4.2.5 Set producers and consumers

Then the sent and received ranks are configured. The receive_ranks and send_ranks are the sent and received ranks corresponding to each tensor at this stage.

As mentioned earlier, PyTorch did not release stable RPC at the time of PipeDream’s development, so PipeDream (2019 paper) had to implement its own set of communication logic relationships, or distributed computing diagrams. Producers and consumers are an important part of a distributed computing graph.

The logical abstraction is as follows:

  • The model of the traversal model is assumed to be Model [I]. Note that model[I] here is the concrete layer. A stage can contain multiple layers, such as [Layer1, Layer 2, layer3], and this stage can parallel data on multiple ranks. For example, rank 1 and Rank 2 run [Layer1, Layer 2, Layer3].

  • For each model [I], iterate over the model after model [I], assuming model [j].

  • Iterate over the output of model [I], assuming tensor_name.

    • If tensor_name is also in the input of modle[j], that is, the output of model[I] and the input of module[j], then they can be related. Because if a tensor has only inputs or only outputs, there is no need to establish any communication mechanism for that tensor.

      • If model[I] and Modle [J] are in the same stage, that is, the same node or several nodes but controlled by DDP, there will be no communication mechanism.

      • If tensor_name is the input for Modle [j] and module[j] is located on this node, the node’s receive_ranks include module[j] input (and possibly other model input as well).

        • Tensor_name = tensor_name; tensor_name = tensor_name;
      • Tensor_name is the output of module[I], and module[I] is located on this node, so send_ranks of this node includes the output of module[I] (as well as the output of other models).

        • So the output rank of tensor_name includes the rank corresponding to model[I].

The specific code is as follows:

# To determine where tensors should be sent and received, first # determine the "producing" and "consuming" module IDs of each # tensor. We then use the corresponding machine Ranks to send # and receive tensors. For I in range(Len (Model)): # For j in range(LEN (model)): Tensor_name = tensor_name = tensor_name = tensor_name = tensor_name = tensor_name = tensor_name = tensor_name = tensor_name # tensor_name = tensor_name = modle[I] # tensor_name = modle[I] # tensor_name = modle[I] # tensor_name = modle[I] # tensor_name = modle[I] # tensor_name = modle[I] # tensor_name = modle[I] # tensor_name = modle[I] # tensor_name = modle[I] If module_to_stage_map[I] == \ module_to_stage_map[j]: # Two modules on a node Continue # For now, assume that each stage is served by only # a single machine. # tensor_name If module_to_stage_map[j] == self.stage: # so tensor_name input ranks include rank I self.receive_ranks[tensor_name] = \ stage_to_rank_map[module_to_stage_map[I]] # If module_to_stage_map[I] == self.stage: tensor_name is the output of module[I], and module[I] is located on this node, which indicates that send_ranks can be associated with this node. # so the output rank of tensor_name includes rank j self.send_ranks[tensor_name] = \ stage_to_rank_map[module_to_stage_map[j]] for model_inputs in inputs_module_destinations.keys(): destination_stage = module_to_stage_map[ inputs_module_destinations[model_inputs]] if destination_stage > self.stage: self.send_ranks[model_inputs] = \ self.ranks_in_next_stage if 0 < self.stage <= destination_stage: self.receive_ranks[model_inputs] = \ self.ranks_in_previous_stage if destination_stage > 0: if model_inputs not in self.tensor_tags: self.tensor_tags[model_inputs] = tensor_tag tensor_tag += 1Copy the code

The obtained variables are as follows:

num_ranks = {int} 4 num_ranks_in_first_stage = {int} 1 num_ranks_in_next_stage = {int} 0 num_ranks_in_previous_stage = {int} 1 num_ranks_in_stage = {int} 1 num_stages = {int} 4 num_warmup_minibatches = {int} 0 rank = {int} 3 rank_in_stage = {int} 0 ranks_in_next_stage = {list: 0} [] ranks_in_previous_stage = {list: 1} [2] receive_ranks = {dict: Sequence 'out8' = {list: 1} [2] 'out9' = {list: 1} [2] 'out10' = {list: 1} } [2] __len__ = {int} 3 send_ranks = {dict: 0} {} # Here is the sequence __len__ = {int} 0 stage = {int} 3Copy the code

4.2.6 setting module

Next, operations related to module will be processed. Here, specifically:

  • First, use ModulesWithDependencies to continue processing the model and configure the inputs and outputs.
  • Cuda is then called to move the model and parameters to the GPU.
  • If processing is required, convert for FP16.

In the ModulesWithDependencies section, we focus on this.

We need to invert the modules index corresponding to this stage.

Modules = stage_to_module_map[self.stage] # here we get [3,4], which we'll use later.Copy the code

Stage_to_module_map sets the stage-to-modules relationship in order to get the modules corresponding to this stage.

Recall from the configuration file that this stage (value 3) corresponds to two modules with index 3, 4, which is 3,3 below

module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]
Copy the code

Next, we need to get the modules of this stage through the following code, including the input and output of each module.

    modules = self.modules_with_dependencies.modules()
    for i in range(len(modules)):
        modules[i] = modules[i].cuda()
        if self.fp16:
            import apex.fp16_utils as fp16_utils
            modules[i] = fp16_utils.BN_convert_float(modules[i].half())
Copy the code

ModulesWithDependencies are as follows:

class ModulesWithDependencies:
    def __init__(self, modules_with_dependencies):
        self._modules = []
        self._all_input_names = []
        self._all_output_names = []
        for (module, input_names, output_names) in modules_with_dependencies:
            self._modules.append(module)
            self._all_input_names.append(input_names)
            self._all_output_names.append(output_names)
​
    def modules(self):
        return self._modules
​
    def all_input_names(self):
        return self._all_input_names
​
    def all_output_names(self):
        return self._all_output_names
​
    def is_input_tensor(self, tensor_name):
        for module_input_names in self._all_input_names:
            if tensor_name in module_input_names:
                return True
        return False
Copy the code

4.2.7 set group,

Next, set up groups for the number of parallelism in each stage.

Ranks are the parallel ranks of each stage, for example, stage 0, which corresponds to [0, 1, 2].

{" module_to_stage_map ": [0, 1, 1]," stage_to_rank_map ": {" 0" : [0, 1, 2], "1" : [3]} # rank of each stage, the purpose is to get a parallel machines here}Copy the code

Iterate through stages, calling new_group() for each stage to create a process group. The new_group() function uses an arbitrary subset of all processes to create a new group of processes, which returns a grouping handle that can be used as a group argument to a distributed function related to collectives (used for information exchange in a particular programming pattern).

This is what was mentioned in the original question: for data parallelism, each stage needs to create and manage its own process group.

# Initialize all groups in the same order on every worker. if stage_to_rank_map is not None: groups = [] for stage in range(self.num_stages): # take a look at [0, 1, 2] if len(ranks) > 1: Append (dist. New_group (ranks=ranks)) else: groups.append(None) group = groups[self.stage] else: group = NoneCopy the code

4.2.8 Configuring Data Parallelism

Finally, distribute Dataparallel is called for processing. Here the parameter process_group=group is returned from “Set group” earlier.

To create a distributed data framework for each group.

# self.modules_with_dependencies contains a list of PyTorch # modules, along with a list of user-defined input and output # tensor names. We use our module_executor.ModuleExecutor # class to wrap these dependencies, and use run_forward and # run_backward methods downstream. num_parameters = 0 for i in range(len(modules)): if group is not None: if ((i < (len(modules)-1) and self.is_criterion) or not self.is_criterion): num_parameters += \ sum(x.size()[0] * x.size()[1] if len(x.size()) > 1 else x.size()[0] for x in modules[i].parameters() If x.s considering # ()) to establish the distributed data parallel modules. [I] = torch nn. The parallel. DistributedDataParallel (modules, [I] process_group = group, device_ids=[local_rank], output_device=local_rank) if self.num_ranks_in_stage > 1: module_size = 4. * num_parameters print("Replicating stage: ranks=%d, module_size=%.3f" % ( self.num_ranks_in_stage, module_size))Copy the code

As for gradient distribution, we will analyze it in a special series in the future.

4.2.9 Initializing communication Functions

Finally, initialize the communication module.

        if self.comm_handler is not None:
            self.comm_handler.initialize(
                self.receive_ranks,
                self.send_ranks,
                self.tensor_tags,
                self.target_tensor_names,
                self.training_tensor_dtypes,
                self.rank_in_stage,
                self.num_ranks_in_stage,
                self.ranks_in_previous_stage,
                self.ranks_in_next_stage)
Copy the code

Let’s use the picture from the paper as an example:

If further refined for this article, it is:

+----------------------------------------+ | Stage 2 StageRuntime | | | | CommunicationHandler | | | | +----------------------------+ | | | +------------------------+ | | | | |Rank 2 | | | | | | | | | | | | | | | +-----------------------------+ | | | Layer 3 +---> Layer 4 | | | | Stage 1 StageRuntime | | | | | | | +---------------------------+ | | | | | | | | | Stage 3 StageRuntime | | | | | +------------------------+ | | | | | CommunicationHandler | | | +------------------------+ | | | CommunicationHandler | | | | | |Rank 3 | | | | | | +-----------------------+ | | DDP | | | | | | +-----------------------+ | | |Rank 1 | +---------------->+ | | +----------> | | Rank 4 | | | | | | | | | Layer 3 +---> Layer 4 | | | | | | | | | Layer 1 +---> Layer 2 | | | | | | | | | | Layer 5 +---> Layer 6 | | | | | | | | | | | | | | | | | | | | | | +------------------------+ | | | | | | | +-----------------------+ | | | +------------------------+ | | | +-----------------------+ | | | | | |Rank 4 | | | | | |  | | | | | | | | | +-----------------------------+ | | | | | | +---------------------------+ | | | Layer 3 +---> Layer 4  | | | | | | | | | | | | | | | | | +------------------------+ | | | +----------------------------+ | +----------------------------------------+Copy the code

The mobile phone is as follows:

4.3 Functions and Functions

We are only introducing the basic functions here. Several other business function functions, such as RUN_forward, are covered in the 1F1B article.

The following functions are completed by calling the communication module.

4.3.1 receive_tensors_forward

Receive_tensors_forward is used to obtain tensors from the front layer in forward propagation.

In forward propagation, the tensor is recorded in self.tensors of this example.

\

def receive_tensors_forward(self): if self.forward_only and len(self.tensors) > 0: Self. tensors. Pop (0) # before self.tensors. Append ({}) if self.loader_iter is not None: Input = next(self.loader_iter) # load new if self.model_type == TRANSLATION: (input, target) = input src, src_length = input tgt, tgt_length = target self.tensors[-1]["input0"] = src.cuda(non_blocking=True) self.tensors[-1]["input1"] = torch.LongTensor(src_length).cuda( non_blocking=True) self.tensors[-1]["input2"] = tgt[:-1].cuda(non_blocking=True) self.tensors[-1]["target"] = tgt[1:].cuda().contiguous().view(-1) self.tensors[-1]["target_length"] = \ torch.tensor([int(sum(torch.LongTensor(tgt_length) - 1))], dtype=torch.int).cuda() elif self.model_type == IMAGE_CLASSIFICATION: (input, target) = input if self.fp16: input = input.half() self.tensors[-1]["input0"] = input.cuda(non_blocking=True) self.tensors[-1]["target"] = target.cuda(non_blocking=True) elif self.model_type == SPEECH_TO_TEXT: input, target, input_percentages, target_sizes = input input_sizes = input_percentages.mul_(int(input.size(3))).int() self.tensors[-1]["input0"] = input.cuda(non_blocking=True) self.tensors[-1]["input1"] = input_sizes.cuda(non_blocking=True) self.tensors[-1]["target"] = target.cuda(non_blocking=True) self.tensors[-1]["target_length"] = target_sizes.cuda( non_blocking=True) else: # Receive all required tensors from upstream machines. for input_name in self.receive_ranks: If input_name == "ack": continue self.tensors[-1][input_name] = \ self.comm_handler.recv( input_name, forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=False) self.forward_stats.stats['receive_tensors_size'] += \ (self.tensors[-1][input_name].element_size() * self.tensors[-1][input_name].nelement()) # Used to track where to receive forward from. self.comm_handler.increment_messaging_index( sending=False)Copy the code

4.3.2 send_tensors_forward

Send_tensors_forward is sending a tensor to the backward layer in forward propagation.

def send_tensors_forward(self): # Send all required tensors downstream. for output_name in self.send_ranks: If output_name == "ack": continue self.comm_handler.send( output_name, self.tensors[-1][output_name], forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=False) self.forward_stats.stats['send_tensors_size'] += \ (self.tensors[-1][output_name].element_size() * self.tensors[-1][output_name].nelement())Copy the code

4.3.3 receive_tensors_backward

In backward propagation, gradients are preserved in self. Gradients.

Receive_tensors_backward is to get tensors from the front plane in backward propagation.

Note that the corresponding ranks are self.send_ranks, which send ranks in the forward process, and they receive ranks in the reverse process

def receive_tensors_backward(self): # Receive all required gradients from downstream # machines. for output_name in self.send_ranks: If output_name in self.target_tensor_names: Self. gradients[output_name] = \ self.comm_handler.recv(output_name, forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=True) self.backward_stats.stats['receive_tensors_size'] += \ (self.gradients[output_name].element_size() * self.gradients[output_name].nelement())Copy the code

4.3.4 send_tensors_backward

In backward propagation, gradients are preserved in self. Gradients.

Send_tensors_forward is sending the gradient tensor to the backward layer in backward propagation.

Note that the corresponding ranks are self.receive_ranks, which receive ranks in the forward process, and send ranks in the reverse process

def send_tensors_backward(self): # Send all required gradients upstream. for input_name in self.receive_ranks: If input_name in self.target_tensor_names: continue self.comm_handler.send( input_name, self.gradients[input_name], forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=True) self.backward_stats.stats['send_tensors_size'] += \ (self.gradients[input_name].element_size() * self.gradients[input_name].nelement()) if self.num_ranks_in_previous_stage > 0: # Used to track where to send tensors in the # backward pass. self.comm_handler.increment_messaging_index( sending=True)Copy the code

4.3.5 run_ack

Run_ack is an acknowledgement to the previous and later layers in propagation.

​
    def run_ack(self):
        # No need for ack if running on a single worker.
        if self.rank is None:
            return
​
        # Receive ack from next stage. Send ack to previous stage.
        if self.stage < (self.num_stages-1):
            self.comm_handler.recv(
                "ack",
                forward_minibatch_id=self.forward_minibatch_id,
                backward_minibatch_id=self.backward_minibatch_id,
                backward=True)
        if self.stage > 0:
            self.comm_handler.send(
                "ack",
                torch.zeros(self.tensor_shapes["ack"],
                            dtype=torch.int64).cuda(),
                forward_minibatch_id=self.forward_minibatch_id,
                backward_minibatch_id=self.backward_minibatch_id,
                backward=True)
​
            # Used to track where to receive forward from.
            self.comm_handler.increment_messaging_index(sending=True)
​
        self.backward_minibatch_id += 1
Copy the code

At this point, we cover the runtime engine with its static information and initialization, and next we cover the communication module.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Pytorch.org/docs/stable…

\