0 x00 the
Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.
This series takes you through the source code analysis of Horovod. This article, the thirteenth in a series, looks at Driver roles in the elastic implementation of Horovod.
This section corresponds to the Driver Main section in the architecture diagram. Since this section is strongly related to Host Discovery, it is shown together. Since the main body of elasticity training is the Driver, this article analyzes the calling code and the Driver together.
Links to other articles in this series are as follows:
Horovod (1) — Basics
Horovod (2) — A distributed training framework for deep learning — from the user’s perspective
Horovod (3) — What’s behind Horovodrun
Horovod (4) — Network Basics & Driver
Horovod (5) — fusion framework
Horovod (6) — background architecture
Horovod (6), a distributed training framework for deep learning, is implemented using threads
Horovod (7) — DistributedOptimizer
Horovod (8) — on Spark
Horovod (9) — Start on Spark
Horovod (10) — Run on Spark
Horovod (11) — on Spark — GLOO scheme
Horovod (12) — a distributed training framework for deep learning horoVOd (12) — an overall architecture for elastic training
0 x01 role
The first thing we need to remember is the role of resilience training.
1.1 Role Setting
Horovod’s flex training consists of two roles, the driver process and the worker process. The driver process runs on the CPU node, while the worker process can run on the CPU or GPU node.
These roles are still very similar to Spark’s Driver, Executor. The Driver process can be considered as the Driver or master node of Spark. Workers are similar to Spark’s Executor.
The details are as follows:
+------------------------------+ | | | Driver | | | | | +-----+-------+--------+-------+ ^ ^ ^ | | | | | | +-------------+ | +--------------+ | | | | | | | | | v v v +--------+----+ +-------+------+ +----+--------+ | Worker | | Worker | | Worker | | | | | | | | host1 | | host2 | | host3 | +-------------+ +--------------+ +-------------+Copy the code
1.2 responsibilities
The role’s responsibilities are as follows:
Master (control node) Responsibilities:
- Responsible for real-time detection of whether the existing worker (work node) changes or drops;
- Responsible for real-time monitoring of host changes through scripts;
- Responsible for assigning tasks to the living worker (work node);
- When a process fails due to a failed AllReduce call, the master organizes the remaining living processes through the blacklist mechanism to construct a new ring.
- If a new host is added, a new worker is generated on top of the new host, and the new worker and the old worker construct a new communication ring together.
Worker (s) responsibilities:
-
Responsible for reporting (actually passive, no active mechanism) their status (i.e. training completion);
-
Is responsible for performing training on the data that the worker is responsible for.
0x02 Call part
Let’s first look at the invocation part, elastic invocation goes from universal to special, a little bit deeper.
2.1 _run
The entry point to the horovod program was the _run function. As you can see, different paths will be chosen depending on whether or not it is elastic training. We begin this article with _run_elastic.
def _run(args) :
# if hosts are not specified, either parse from hostfile, or default as
# localhost
if not args.hosts and not args.host_discovery_script:
if args.hostfile:
args.hosts = hosts.parse_host_files(args.hostfile)
else:
# Set hosts to localhost if not specified
args.hosts = 'localhost:{np}'.format(np=args.np)
# Convert nics into set
args.nics = set(args.nics.split(', ')) if args.nics else None
if _is_elastic(args):
return _run_elastic(args) # This article is here
else:
return _run_static(args)
Copy the code
2.2 _run_elastic
The logic of this part is as follows:
- First of all, if the parameter configuration “parameters for the script”, call the discovery. HostDiscoveryScript get an object (currently only one object, in the future to build ElasticDriver will host information). Otherwise, read the fixed host configuration directly;
- Second, configure ElasticSettings using host configuration and other information.
- Finally, gloo_run_elastic is called to do elasticity training;
The code is as follows:
def _run_elastic(args) :
# construct host discovery component
if args.host_discovery_script:
discover_hosts = discovery.HostDiscoveryScript(args.host_discovery_script, args.slots)
elif args.hosts:
_, available_host_slots = hosts.parse_hosts_and_slots(args.hosts)
discover_hosts = discovery.FixedHosts(available_host_slots)
......
# horovodrun has to finish all the checks before this timeout runs out.
settings = elastic_settings.ElasticSettings(discovery=discover_hosts,
min_np=args.min_np or args.np,
max_np=args.max_np,
elastic_timeout=args.elastic_timeout,
reset_limit=args.reset_limit,
num_proc=args.np,
verbose=2 if args.verbose else 0,
ssh_port=args.ssh_port,
ssh_identity_file=args.ssh_identity_file,
extra_mpi_args=args.mpi_args,
key=secret.make_secret_key(),
start_timeout=tmout,
output_filename=args.output_filename,
run_func_mode=args.run_func is not None,
nics=args.nics,...)
env = os.environ.copy()
config_parser.set_env_from_args(env, args)
gloo_run_elastic(settings, env, args.command)
Copy the code
2.3 gloo_run_elastic
This part begins with the first part of the architecture diagram:
The main work is as follows:
- Defines get_common_interfaces, which are functions that retrieve network routing information and host capabilities. One thing to note is that it waits for the minimum number of nodes required before it begins to obtain network routes.
- _exec_command_fn, which we described earlier, provides the ability to run commands, or an environment to run them;
- A RendezvousServer is established to hold various host information.
- Run launch_gloo_elastic with these arguments and the command argument, which is similar
python train.py
;
def gloo_run_elastic(settings, env, command) :
def get_common_interfaces(driver) :
# Host-to-host common interface detection requires at least 2 hosts in an elastic job.
min_hosts = _get_min_start_hosts(settings)
current_hosts = driver.wait_for_available_slots(settings.num_proc, min_hosts=min_hosts)
return driver_service.get_common_interfaces(settings, current_hosts.host_assignment_order)
exec_command = _exec_command_fn(settings)
rendezvous = RendezvousServer(settings.verbose)
launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous)
Copy the code
2.4 get_common_interfaces
Get_common_interfaces allows you to get network routing information as well as hosts.
- If a remote host is configured, distributed execution is performed on each host to obtain the network adapter and routing information of each host.
- Otherwise, obtain information such as the local network adapter.
The specific function is in: runner/driver/driver_service.py
def get_common_interfaces(settings, all_host_names, remote_host_names=None, fn_cache=None) :
if remote_host_names is None:
remote_host_names = network.filter_local_addresses(all_host_names)
if len(remote_host_names) > 0:
if settings.nics:
# If args.nics is provided, we will use those interfaces. All the workers
# must have at least one of those interfaces available.
nics = settings.nics
else:
# Find the set of common, routed interfaces on all the hosts (remote
# and local) and specify it in the args to be used by NCCL. It is
# expected that the following function will find at least one interface
# otherwise, it will raise an exception.
local_host_names = set(all_host_names) - set(remote_host_names)
nics = _driver_fn(all_host_names, local_host_names, settings, fn_cache=fn_cache)
else:
nics = get_local_interfaces(settings)
return nics
Copy the code
Get_local_interfaces gets the network adapter information for the local host.
def get_local_interfaces(settings) :
if settings.verbose >= 2:
print('All hosts are local, finding the interfaces '
'with the address 127.0.0.1')
# If all the given hosts are local, find the interfaces with address
# 127.0.0.1
nics = set(a)for iface, addrs in net_if_addrs().items():
if settings.nics and iface not in settings.nics:
continue
for addr in addrs:
if addr.family == AF_INET and addr.address == '127.0.0.1':
nics.add(iface)
break
return nics
Copy the code
2.5 Obtaining Remote NIC Information
This information has been described in the previous article, but we will simplify it as follows:
The _driver_fn function is distributed to execute the search function, which is:
- Start the service service.
- Get the address of the driver service using driver.addresses() (using
self._addresses = self._get_local_addresses()
Complete); - Start the Task service in each worker using _launch_task_Servers (using the address of the Driver service), and the task service is registered in the Service service.
- Because it is a ring, each worker will detect all network interfaces for worker index + 1;
- Finally, _run_probe returns an intersection of all routing interfaces on all workers;
@cache.use_cache()
def _driver_fn(all_host_names, local_host_names, settings) :
""" launches the service service, launches the task service on each worker and have them register with the service service. Each worker probes all the interfaces of the worker index + 1 (in a ring manner) and only keeps the routed interfaces. Function returns the intersection of the set of all the routed interfaces on all the workers. :param all_host_names: List of addresses. For example, ['worker-0','worker-1'] ['10.11.11.11', '10.11.11.12'] :type all_host_names: list(string) :param local_host_names: host names that resolve into a local addresses. :type local_host_names: set :param settings: the object that contains the setting for running horovod :type settings: horovod.runner.common.util.settings.Settings :return: example: ['eth0', 'eth1'] :rtype: list[string] """
# Launch a TCP server called service service on the host running horovod
num_hosts = len(all_host_names)
driver = HorovodRunDriverService(num_hosts, settings.key, settings.nics)
# Have all the workers register themselves with the service service.
_launch_task_servers(all_host_names, local_host_names,
driver.addresses(), settings)
try:
return _run_probe(driver, settings, num_hosts)
finally:
driver.shutdown()
Copy the code
2.6 launch_gloo_elastic
Here, the GLOO elastic system is officially invoked, which is to generate the relevant part of the Driver & establish the worker of elastic training.
During run, there is only one RendezvousServer, and launch_gloo_elastic runs only once.
The logic is as follows:
- If the output file needs to be configured, create it.
- Create ElasticDriver using “discovery script” as parameters.
- Start the Rendezvous ousServer with create_rendezvous ous_handler as the handler;
- Use driver.wait_for_available_slots to wait for the minimum number of slots required;
- If it does, it calls get_common_interfaces to get the network route, and so on, to get the server IP.
- Register shutdown event;
- Use get_run_command to get the command to run;
- Use _create_ELAStic_worker_fn to create flexibility training worker;
The simple code is as follows:
def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous) :
# Make the output directory if it does not exist
if settings.output_filename:
_mkdir_p(settings.output_filename)
Create ElasticDriver with "discovery script" as parameter
driver = ElasticDriver(rendezvous, settings.discovery,
settings.min_np, settings.max_np,
timeout=settings.elastic_timeout,
reset_limit=settings.reset_limit,
verbose=settings.verbose)
handler = create_rendezvous_handler(driver)
global_rendezv_port = rendezvous.start(handler) # start RendezvousServer
driver.wait_for_available_slots(settings.num_proc)
nics = get_common_interfaces(driver) # Obtain network routing, etc
server_ip = network.get_driver_ip(nics)
event = register_shutdown_event()
run_command = get_run_command(command, server_ip, nics, global_rendezv_port, elastic=True)
# Build a flexibility training worker
create_worker = _create_elastic_worker_fn(exec_command, run_command, env, event)
driver.start(settings.num_proc, create_worker)
res = driver.get_results()
driver.stop()
for name, value in sorted(res.worker_results.items(), key=lambda item: item[1] [1]):
exit_code, timestamp = value
Copy the code
It’s complicated, so we have to go through it one by one.
First, let’s look at get_run_command, which was introduced earlier in Spark Gloo and is covered here.
It calls create_run_env_vars to get the information gloo needs and builds run_Command based on it, which looks like this:
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python
Copy the code
As you can see, the elastic and spark gloo versions are similar in that they use RendezvousServer to perform some of the master’s control functions.
Second, let’s look at the Driver body.
0x03 Driver Main
3.1 ElasticDriver
Defined as follows, the basic members are:
-
_rendezvous: The driver reperforms a rendezvous Server based on the currently running node. This rendezvous stores the address of each worker and the sequence rank assigned to it in the logical communication ring;
-
_host_manager: HostManager is responsible for discovering and managing various hosts;
-
_worker_registry :WorkerStateRegistry
-
_discovery_thread: is responsible for the background to periodically explore the host, the specific call _host_manager to complete the function;
-
_worker_Clients: WorkerNotificationClient, one for each worker;
-
_host_assignments: Host assignments;
-
_rank_Assignments: Rank assignments. A rank can be thought of as representing a training process in a distributed task. Rank 0 usually has a special meaning in Horovod: it is the device responsible for this synchronization.
-
_world_size: total number of processes. Training will not start until all world_size processes are ready.
-
_wait_hosts_cond: The type is threading.Condition. The goal is to wait for the minimum number of hosts required for training;
Specific definitions are as follows:
class ElasticDriver(object) :
def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, verbose=0) :
self._rendezvous = rendezvous
self._host_manager = HostManager(discovery)
self._host_assignments = {}
self._rank_assignments = {}
self._world_size = 0
self._wait_hosts_cond = threading.Condition()
self._create_worker_fn = None
self._worker_clients = {}
self._worker_registry = WorkerStateRegistry(self, self._host_manager, reset_limit=reset_limit)
self._results = ResultsRecorder()
self._shutdown = threading.Event()
self._discovery_thread = threading.Thread(target=self._discover_hosts)
self._discovery_thread.daemon = True
self._discovery_thread.start()
Copy the code
After creating the Driver, the main actions are:
- Wait for minimum number of hosts.
- Configuration of the worker.
- When the driver is started, the worker is started internally.
- The Driver waits for the running result of the worker.
Let’s do it step by step.
3.2 Waiting for the minimum number of hosts
After startup, driver.wait_for_available_slots(settings.num_proc) is called to wait for the minimum number of hosts.
As you can see, this is an infinite wait loop, and it will only return if avail_slots >= min_NP and avail_hosts >= min_hosts. The number of hosts on the current_hosts server is the minimum number of required hosts and the number of slots.
def wait_for_available_slots(self, min_np, min_hosts=1) :
tmout = timeout.Timeout(self._timeout, message=' ')
self._wait_hosts_cond.acquire()
try:
while True: # Infinite loop wait
current_hosts = self._host_manager.current_hosts
avail_slots = current_hosts.count_available_slots()
avail_hosts = len(current_hosts.available_hosts)
if avail_slots >= min_np and avail_hosts >= min_hosts:
return current_hosts
if self._shutdown.is_set():
raise RuntimeError('Job has been shutdown, see above error messages for details.')
self._wait_hosts_cond.wait(tmout.remaining())
tmout.check_time_out_for('minimum number of slots to become available')
finally:
self._wait_hosts_cond.release()
Copy the code
The logic is as follows:
launch_gloo_elastic
+
|
|
|
| +----------------+
v | HostManager |
+--------------+------------------+ wait | |
| driver.wait_for_available_slots | +---------> | |
+---------------------------------+ | current_hosts |
+----------------+
Copy the code
3.3 configure the worker
The configuration process is completed by _create_ELAStic_worker_fn.
_create_ELAStic_worker_fn has two parts:
- _slot_info_to_command_fn creates the slot_info_to_command by combining various environment variables with the run_command command, similar to the previous article. Get a command text that can be run on “some host and slot”;
- Returns the create_worker.
- Create_worker is a function built using exec_command and command text.
- Exec_command, which we talked about earlier, provides the ability to run commands, or the environment to run commands;
- So create_worker provides the ability to run a command in an environment;
The relationship between these concepts is as follows:
3.4 to start the driver
driver.start(settings.num_proc, create_worker)
Copy the code
The specific startup goes through the following steps.
3.4.1 track start
def start(self, np, create_worker_fn) :
self._create_worker_fn = create_worker_fn
self._activate_workers(np)
Copy the code
3.4.2 _activate_workers
The resume/start function of ElasticDriver calls _activate_workers, which is defined as follows. You can see that if the discovery script has discovered the new node, it returns pending_slots. Pending_slots is where new workers can be started on top of these slots, so _start_worker_processes is called:
def _activate_workers(self, min_np) :
current_hosts = self.wait_for_available_slots(min_np)
pending_slots = self._update_host_assignments(current_hosts)
self._worker_registry.reset(self.world_size())
self._start_worker_processes(pending_slots)
Copy the code
Rule 3.4.3 _start_worker_processes
After startup, start the worker in a thread through run_worker, and then use self._results.expect(thread) to put the worker thread into the ResultsRecorder. This is the key to waiting for results.
def _start_worker_processes(self, pending_slots) :
for slot_info in pending_slots:
self._start_worker_process(slot_info)
def _start_worker_process(self, slot_info) :
create_worker_fn = self._create_worker_fn
shutdown_event = self._shutdown
host_event = self._host_manager.get_host_event(slot_info.hostname)
def run_worker() :
res = create_worker_fn(slot_info, [shutdown_event, host_event])
exit_code, timestamp = res
self._handle_worker_exit(slot_info, exit_code, timestamp)
thread = threading.Thread(target=run_worker) # Start the training thread
thread.daemon = True
thread.start()
self._results.expect(thread) Wait for the result of the run
Copy the code
3.5 Waiting for the Running Result
The Driver uses the following to obtain the result.
def get_results(self) :
return self._results.get_results()
Copy the code
_results is the ResultsRecorder type, so we need to look at the implementation.
3.5.1 track of ResultsRecorder
Several functions are as follows:
- Expect waits for threads: Use Expect to self._worker_threads.put(worker_thread) so you know which threads to wait for.
- Add_result Add result:
_handle_worker_exit
It’s called after recordself._results.add_result(name, (exit_code, timestamp))
Record results; - Get_results: The driver calls this function to get the result, using a join.
class ResultsRecorder(object) :
def __init__(self) :
self._error_message = None
self._worker_results = {}
self._worker_threads = queue.Queue()
def expect(self, worker_thread) :
self._worker_threads.put(worker_thread)
def add_result(self, key, value) :
if key in self._worker_results:
return
self._worker_results[key] = value
def get_results(self) :
while not self._worker_threads.empty():
worker_thread = self._worker_threads.get()
worker_thread.join()
return Results(self._error_message, self._worker_results)
Copy the code
3.5.2 worker end
The Driver uses _handle_worker_exit to wait for the specific worker to finish. The worker’s return determines what to do.
_handLE_worker_exit is run in worker Thread. At run time, it will register information in the ResultsRecorder via self._results.add_result.
def _handle_worker_exit(self, slot_info, exit_code, timestamp) :
if not self.has_rank_assignment(slot_info.hostname, slot_info.local_rank):
# Ignore hosts that are not assigned a rank
return
if exit_code == 0: # Complete the record successfully
rendezvous_id = self._worker_registry.record_success(slot_info.hostname, slot_info.local_rank)
else: Otherwise the record fails
rendezvous_id = self._worker_registry.record_failure(slot_info.hostname, slot_info.local_rank)
if self.finished() and self._worker_registry.last_rendezvous() == rendezvous_id:
name = '{} ({}).format(slot_info.hostname, slot_info.local_rank)
self._results.add_result(name, (exit_code, timestamp)) Register information with ResultsRecorder
Copy the code
The details are as follows:
+-----------------------------+
| ElasticDriver |
| |
| start |
| + |
| | 1 |
| | |
| v |
| _activate_workers |
| + | +-------------------------+
| | | | Thread |
| | 2 | | |
| v | | run_worker |
| _start_worker_processes | | + |
| + | | | |
| | | | | 7 |
| | 3 | | v |
| v | | create_worker_fn |
| _start_worker_process | | + |
| + | | | |
| | | | | 8 |
| | 4 | | v | results.add_result
| v | thread.start() | _handle_worker_exit +---------------------------------+
| run_worker +---------------------------> | | 9 |
| + | 5 +-------------------------+ |
| | | |
| | | +------------------------------------------------------+ |
| v | expect(thread) | ResultsRecorder | |
| self._results +------------------------> | | |
| | 6 | _worker_results = [thread] | |
| | | | |
| | | _worker_threads = [name : (exit_code, timestamp)] <-----+
+-----------------------------+ | |
+------------------------------------------------------+
Copy the code
Or mobile phone as follows:
We’re going to look at the other parts of flexibility training in detail.
Because Driver is the main framework for flexibility training, it is inevitable that some of this text will appear in other articles, so please understand.
0xEE Personal information
Thoughts on life and technology
Wechat public account: Rosie’s Thinking
If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.
0 XFF reference
Horovod for ElasticDL in Kubernetes
Kubernetes Training _ Distributed deep learning training using Horovod on Kubernetes
The Elastic Training Operator is an Elastic deep learning Training tool on Kubernetes
Elastic and Fault-tolerant Distributed Training for ElasticHorovod
Horovod Flexibility training