background

One notable feature of machine learning workloads compared to traditional workloads is the high demand for gpus. In a previous article (mp.weixin.qq.com/s/Nasm-cXLt… And mp.weixin.qq.com/s/X4VDynLfK… GPU memory is no longer enough to keep up with the scale of model parameters. This problem has become more pronounced with the emergence of new model constructs such as Transformer. As more and more resources are needed for algorithm engineers to train models, distributed training has become the standard way of model training in industry.

Elastic training can dynamically adjust the number of training instances in the training process and greatly improve the utilization of cluster resources. At the same time, with cloud bidding instances and other resource types, model tuning can be carried out at a lower cost, further reducing costs and increasing efficiency. In PyTorch’s latest release, Version 1.9.0, the original distributed training approach torch.distributed. Launch will be scrapped in favor of the flexible distributed training interface Torch.distributed.

We took the opportunity to give a brief introduction to this new feature, as well as a brief comparison and analysis with Horovod Elastic. Finally summarize the use of flexibility training, need to pay attention to the problem.

PyTorch design prior to 1.9.0

PyTorch is one of the most popular deep learning frameworks out there, and it’s best known for its ease of use. PyTorch provides a concise API for both stand-alone and distributed training.

Prior to PyTorch 1.9.0, distributed training was usually done in the following way.

python -m torch.distributed.launch
        --nnodes=NODE_SIZE
        --nproc_per_node=TRAINERS_PER_NODE
        --node_rank=NODE_RANK
        --master_port=HOST_PORT
        --master_addr=HOST_NODE_ADDR
        YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
Copy the code

Nnodes is the number of nodes participating in training, and nproc_per_node is the number of processes running on each node. Node_rank is the identifier of the current node, and master_addr and master_port are the addresses and ports that the master listens on. Torch.distributed. Launch sets several environment variables, including WORLD_SIZE and MASTER_PORT, MASTER_ADDR, etc.

Then, corresponding processes will be created on the current machine for training. The current machine will have TRAINERS_PER_NODE processes, which form a local worker group. A total of NODE_SIZE machines participate in the training, and a total of NODE_SIZE * TRAINERS_PER_NODE processes. If you want to initiate a distributed training task, you need to execute the corresponding command on all the machines.

New design in PyTorch 1.9.0

In PyTorch 1.9, Torch.distributed. Launch will be scrapped and replaced by PyTorch/Elastic based Torch.distributed. Run. This new approach has some usage changes compared to the previous one, as shown below.

python -m torch.distributed.run
        --nnodes=MIN_SIZE:MAX_SIZE
        --nproc_per_node=TRAINERS_PER_NODE
        --rdzv_id=JOB_ID
        --rdzv_backend=c10d
        --rdzv_endpoint=HOST_NODE_ADDR
        YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
Copy the code

It provides some new capabilities: first, better fault tolerance, when the worker fails, it will automatically restart to continue training; The RANK and WORLD_SIZE fields no longer need to be set manually. Last but not least, flexible training is supported to dynamically increase or decrease the number of workers participating in training. In the example above, the nNodes setting is no longer a fixed value, but an interval. The training task can tolerate changes in the number of workers within this range.

The training code also needs to be modified if resilience is to be supported.

def main():
     args = parse_args(sys.argv[1:])
     state = load_checkpoint(args.checkpoint_path)
     initialize(state)
     # torch.distributed.run ensure that this will work
     # by exporting all the env vars needed to initialize the process group
     torch.distributed.init_process_group(backend=args.backend)
     for i in range(state.epoch, state.total_num_epochs)
          for batch in iter(state.dataset)
              train(batch, state.model)
          state.epoch += 1
          save_checkpoint(state)
Copy the code

One obvious change is that users need to manually checkpoint. This is because when workers fail, all workers will restart, so checkpoint mechanism is needed to ensure that training can continue after restart. This new distributed training approach introduces a number of new concepts, including Agent, Rendezvous, etc. Next we introduce these new designs, starting with torch.distributed. Run, which is available to users.

def run(args):
    if args.standalone:
        args.rdzv_backend = "c10d"
        args.rdzv_endpoint = "localhost:29400"
        args.rdzv_id = str(uuid.uuid4())
        log.info(
            f"\n**************************************\n"
            f"Rendezvous info:\n"
            f"--rdzv_backend={args.rdzv_backend} "
            f"--rdzv_endpoint={args.rdzv_endpoint} "
            f"--rdzv_id={args.rdzv_id}\n"
            f"**************************************\n"
        )
    config, cmd, cmd_args = config_from_args(args)
    elastic_launch(
        config=config,
        entrypoint=cmd,
    )(*cmd_args)
Copy the code

There are two distinct modes, Standalone mode and distributed mode. The Standalone mode is a special case of the distributed mode. It provides convenient Settings for Standalone mode with multiple workers and does not need to set unnecessary parameters such as Rdzv_backend and Rdzv_endpoint.

Both end up launching the actual training process via Elastic_launch. Elastic_launch manages the life cycle of workers through an elastic agent, whose return is the output of each worker.

class elastic_launch:
    ...
    def __call__(self, *args):
        return launch_agent(self._config, self._entrypoint, list(args))
def launch_agent(
    config: LaunchConfig,
    entrypoint: Union[Callable, str, None],
    args: List[Any],
) -> Dict[int, Any]:
    ...
    agent = LocalElasticAgent(
        spec=spec, start_method=config.start_method, log_dir=config.log_dir
    )
    ...
    result = agent.run()
    ...
    return result.return_values
Copy the code

Elastic Agent: How to manage multiple worker processes

An elastic Agent is an independent process that manages its workers. It plays a role similar to the supervisor of process management system, which will ensure the correct setting of each worker during startup. Since WORLD_SIZE and RANK information is no longer required from the user, Elastic Agent will handle it.

In addition, worker failures are also captured and processed by elastic Agent. It can be said that elastic Agent is the most important abstract concept in elastic training.

This illustration shows how Elastic Agent works.

Rendezvous is used to discover each other between different Elastic agents and synchronize member changes among workers. At the same time, failures in the training process are captured by monitoring worker processes. The core logic is wrapped in a function call, localElasticAgent.run ().

    def run(self, role: str = DEFAULT_ROLE) -> RunResult:
        ...
        result = self._invoke_run(role)
        return result
    def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult:
        ...
        self._initialize_workers(self._worker_group)
        while True:
            ...
            run_result = self._monitor_workers(self._worker_group)
            state = run_result.state
            ...
            if state == WorkerState.SUCCEEDED:
                ...
                return run_result
            elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
                if self._remaining_restarts > 0:
                    ...
                    self._restart_workers(self._worker_group)
                else:
                    ...
                    return run_result
            elif state == WorkerState.HEALTHY:
                ...
                if num_nodes_waiting > 0:
                    ...
                    self._restart_workers(self._worker_group)
            else:
                raise Exception(f"[{role}] Worker group in {state.name} state")
Copy the code

You can see that the core logic is in _INVOke_run. _initialize_workers performs most of the initialization work, including assigning ranks to each worker, etc. In the default implementation the Elastic Agent and workers processes are on the same machine, Therefore, self._monitor_workers(self._worker_group) uses multiprocessing to monitor the running status of workers. And according to different states, different processing.

ElasticAgent ElasticAgent elastic Agent elastic Agent is very scalable, in 1.9.0 there are three agents, namely ElasticAgent, SimpleElasticAgent and LocalElasticAgent.

ElasticAgent is an Abstract Class. SimpleElasticAgent implements some of its functions. LocalElasticAgent implements an ElasticAgent that manages all worker processes on a single machine.

SimpleElasticAgent is an abstraction that allows you to easily extend a new agent implementation. For example, if you want to manage all the workers on multiple machines through an agent, not just the worker on the local machine, You can do this by extending SimpleElasticAgent.

Rendezvous design: How to determine RANK between different nodes

Next, we’ll look at another core abstract Rendezvous. In order to achieve flexibility training, membership changes should be dynamically carried out among workers. Rendezvous was the synchronization component that implemented this feature.

Rendezvous’s core approach is to:

    @abstractmethod
    def next_rendezvous(
        self,
    ) -> Tuple[Store, int, int]:
        """Main entry-point into the rendezvous barrier.
        Blocks until the rendezvous is complete and the current process is
        included in the formed worker group, or a timeout occurs, or the
        rendezvous was marked closed.
        Returns:
            A tuple of :py:class:`torch.distributed.Store`, ``rank``, and
            ``world size``.
        Raises:
            RendezvousClosedError:
                The rendezvous is closed.
            RendezvousConnectionError:
                The connection to the rendezvous backend has failed.
            RendezvousStateError:
                The rendezvous state is corrupt.
            RendezvousTimeoutError:
                The rendezvous did not complete on time.
        """
Copy the code

As the comment shows, this function call is blocked until the required number of workers is reached. This function is called when the worker is initialized or restarted. When the function returns, different workers will use the returned rank as the unique identifier. Rendezvous consists of four implementations, namely ETCD, ETCD-V2, C10D, and Static.

class EtcdRendezvousHandler(RendezvousHandler): def next_rendezvous(self): rdzv_version, rank, world_size = self._rdzv_impl.rendezvous_barrier() log.info("Creating EtcdStore as the c10d::Store implementation") store  = self._rdzv_impl.setup_kv_store(rdzv_version) return store, rank, world_sizeCopy the code

Etcd relates to the previously recommended implementation, which is no longer recommended after the advent of C10D. In the implementation of ETCD, the states between different workers are stored through the KV interface of ETCD.

The process of determining the participating instances and corresponding ranks is shown in the figure below.

The first attempt is to write a value status: setup under/RDZV /active_version. Throughout the rendezvous process, / RDZV/Active_version will be used as the KV store for the intermediate state of the Rendezvous process and as an exclusive lock for the Rendezvous process.

If the write fails, a corresponding Rendezvous process is already in progress.

Upon success, / RDZV /version_counter is updated to the original value plus one. A directory/RDZV /v_${version_counter} is then created. After these operations are complete, the/RDZV /active_version status is changed to Joinable, and the join stage is entered.

In the join phase, different agents under the protection of the lock will update the paticipants under/RDZV /active_version and assign them to the increasing rank. The rank here is not the global rank assigned to each worker process, but the rank of the agent itself. The rank of worker process is calculated according to the Agent rank. It’s also a very confusing design, thinking there’s room for improvement.

def init_phase(self): try: active_version = self.try_create_rendezvous() state = json.loads(active_version.value) log.info("New rendezvous state created: " + str(state)) except etcd.EtcdAlreadyExist: A new rendezvous process active_version is already available, state = self.get_rdzv_state() # Note: it is possible for above query to fail (etcd.EtcdKeyNotFound), # but this is ok for us - just means we'll restart from beginning. log.info("Observed existing rendezvous state: " + str(state)) if state["status"] == "closed": raise RendezvousClosedError() if state["status"] == "joinable": return self.join_phase(state["version"]) if state["status"] == "final": self.handle_existing_rendezvous(state["version"]) raise EtcdRendezvousRetryImmediately() self.try_wait_for_state_change(etcd_index=active_version.etcd_index + 1) raise EtcdRendezvousRetryableFailure()Copy the code

When the trained nodes reach the minimum value in the command line parameter of NNodes, the frozen phase starts when the waiting time ends or the trained nodes reach the maximum value set by NNodes.

In the Fronzen phase, each participating node is verified by writing a value under/RDZV /v_${version_counter}/rank_${agent_rank}. After all nodes are confirmed, the final stage will be entered.

In the final stage, subsequent agents will be pending, and agents on the rendezvous nodes will assign ranks to the worker processes they manage. The instance of RANK 0 will exist as the master. The corresponding worker process is then created directly. In the default LocalElasticAgent, multiple processes are created locally using python.multiprocessing.

@prof def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]: spec = worker_group.spec store = worker_group.store ... for worker in worker_group.workers: local_rank = worker.local_rank worker_env = { "LOCAL_RANK": str(local_rank), "RANK": str(worker.global_rank), ... }... args[local_rank] = tuple(worker_args) ... self._pcontext = start_processes( name=spec.role, entrypoint=spec.entrypoint, args=args, envs=envs, log_dir=attempt_log_dir, start_method=self._start_method, redirects=spec.redirects, tee=spec.tee, ) return self._pcontext.pids()Copy the code

C10d new design

The ETCd-based Rendezvous implementation introduced earlier ensured a strong consensus among instances on the nodes participating in training, but it also introduced additional dependencies for PyTorch to run training tasks. So PyTorch also provides a built-in implementation, C10D. In contrast to etCD-based implementations, C10D is based on TCP for synchronization.

def create_backend(params: RendezvousParameters) -> Tuple[C10dRendezvousBackend, Store]:
    ...
    if store_type == "file":
        store = _create_file_store(params)
    elif store_type == "tcp":
        store = _create_tcp_store(params)
    ...
    backend = C10dRendezvousBackend(store, params.run_id)
def _create_tcp_store(params: RendezvousParameters) -> TCPStore:
    host, port = parse_rendezvous_endpoint(params.endpoint, default_port=29400)
    ...
    for is_server in [is_host, False]:
        ...
        store = TCPStore(
            host, port, is_master=is_server, timeout=timedelta(seconds=read_timeout)
        )
        ...
        break
    return store
Copy the code

C10d is a client-server architecture, in which an agent will run the TCPServer of C10D, which listens on a given port and provides primitives such as compareAndSet and Add. It can also be thought of as a simplified in-memory database that provides a KV interface, similar to Redis. Rendezvous synchronization was performed by agents using a C10D TCPServer on a centralized agent. It is predictable that such an implementation will fall short of etCD in terms of usability, but will win in ease of use. Users using C10D no longer need to operate and maintain an ETCD cluster.

PyTorch Elastic on Kubernetes

To enjoy the benefits of flexibility training, PyTorch also offers support on Kubernetes. Compared to the previous version 1.9.0, the new version of distributed training adds some new parameters. So the PyTorch community made some changes to the CRD based on the Kubeflow PyTorch Operator. A typical example of flexibility training is as follows:

apiVersion: elastic.pytorch.org/v1alpha1 kind: ElasticJob metadata: name: imagenet namespace: elastic-job spec: # Use "etcd-service:2379" if you already apply etcd.yaml rdzvEndpoint: "<your_etcd_endpoint>:<your_etcd_port>" minReplicas: 1 maxReplicas: 2 replicaSpecs: Worker: replicas: 2 restartPolicy: ExitCode template: apiVersion: v1 kind: Pod spec: containers: - name: elasticjob-worker image: Torchelastic/examples: 0.2.0 imagePullPolicy: Always the args: - "--nproc_per_node=1" - "/workspace/examples/imagenet/main.py" - "--arch=resnet18" - "--epochs=20" - "--batch-size=32" # number of data loader workers (NOT trainers) # zero means load the data on the same process as the trainer # this is set so that the container does not OOM since # pytorch data loaders use shm - "--workers=0" - "/workspace/data/tiny-imagenet-200" resources: limits: nvidia.com/gpu: 1Copy the code

Since c10D-based Rendezvous was not initially supported, rdzvEndpoint needed to be defined in CRD to point to an already deployed ETCD cluster. Also, the user needs to specify minReplicas and maxReplicas. Otherwise, Kubeflow PyTorchJob is no different.

PyTorch Elastic and Horovod Elastic

At present, the two designs are similar in principle. PyTorch Elastic offers more flexible extensibility than Horovod Elastic, offering agent, Rendezvous, and other interfaces that users can extend as needed. But in another way, Horovod is better at ease of use.

PyTorch does not provide built-in support for saving status. In order to rebuild the training task when the worker process fails, users need to implement the logic of saving to load checkpoint. Horovod provides a built-in implementation.

Horovod and PyTorch are also quite different in terms of synchronization. Horovod Elastic requires the user to provide a script discovery_hosts.sh to help it get the nodes participating in training at run time.

$ horovodrun -np 8 --host-discovery-script discover_hosts.sh python train.py
...
$ ./discover_hosts.sh
host-1:29500
host-2:29500
host-3:29500
Copy the code

This is equivalent to handing over the node discovery logic to the user. PyTorch, on the other hand, uses components such as ETCD and self-implemented C10D to solve the problem of mutual discovery between nodes.

conclusion

At the end of the article, we summarize the problems that need to be paid attention to in the realization of flexibility training.

First and foremost, elastic training requires a mechanism to solve the problem of mutual discovery between nodes/training processes. In the process of training, nodes will join or quit dynamically. How to make other nodes perceive this change is the main problem faced by this mechanism. In its current design, Horovod leaves this up to the user to solve, and Horovod periodically executes user-defined logic to discover the current node. PyTorch implements highly available node discovery through third-party distributed consistency middleware such as ETCD. In addition, there has been some exploratory work using the Gossip based protocol for synchronization without introducing too many components while maintaining high availability.

Secondly, to achieve elastic training, it is necessary to capture training failures. Both Horovod and PyTorch implement this logic through a background process (the Driver in Horovod and the Local Elastic Agent for each node in PyTorch). When the process crashes, or encounters problems in gradient communication, the background process catches the failure and rediscovers the node, and then restarts the training.

Finally, the data segmentation logic and the setting of learning rate/Batch size during training should also be modified accordingly. Since the processes involved in training will increase and decrease dynamically, it may be necessary to reset the learning rate and data allocation logic according to the size of the new training process to avoid affecting the model convergence.

In this article, we first introduce the design and implementation of elastic training in PyTorch 1.9.0. Then it analyzes and summarizes the design differences between different frameworks and the ways to achieve flexibility training. From our point of view, flexible training can well fit the trend of cloud native, and it is the future trend to reduce costs and improve resource utilization with extreme flexibility. As a result, we are currently actively participating in community contributions to resilience training in TensorFlow, PyTorch and Kubeflow. More articles will be published soon. Thank you for your attention.