preface

Parallel computing is to use parallel computers to reduce the time needed for a single computation problem. We can design our parallel programs by using programming languages to explicitly explain how different parts of the computation are executed on different processors at the same time, and finally achieve the goal of greatly improving the efficiency of the program.

It is well known that GIL in Python limits the use of Python multithreaded parallelism for multi-core cpus, However, there are various other ways to make Python truly multi-core, such as through C/C++ extensions for multithreading/multiprocessing, and direct multiprocess programming using Python’s multiprocess module, Multiprocessing.

This paper mainly tries to optimize and improve the efficiency of its own dynamic calculation program only by using python’s built-in Multiprocessing module, in which:

  • The parallelism and acceleration comparison are realized by using multi-core resources in a single machine
  • Manager module is used to achieve a simple multi-machine distributed computing

This article is not an introduction to the interface for translation of Python multiprocessing module, need to be familiar with multiprocessing docs.python.org/2/library/m ‘shoes can refer to the official document…

The body of the

Recently, I want to use my own microscopic dynamics program to carry out a series of solutions and draw the results into a two-dimensional Map for visualization. In this way, I need to calculate multiple points on the two-dimensional Map and collect the results and draw them. Since each point requires an ODE integral and Newton method to solve the equations, So it can be very inefficient to draw the whole graph serially, especially when you’re testing parameters, and you have to wait a long time for each graph to be drawn. Each point in the two-dimensional graph is calculated independently, so it is natural to think of parallelization.

Serial original version

As the script is quite long and implemented by our own program, the general structure of the script is as follows. In essence, it is a double cycle, and the variables of the cycle are the partial pressure values of reactant gases (O2 and CO) respectively:

Import time import numpy as NP # PCOs = np. Linspace (1 e - 5, 0.5, 10) pO2s = np. Linspace (1 e - 5, 0.5, 10) if = = "__main__" __name__ : try: start = time.time() for i, pO2 in enumerate(pO2s): # ... For J, pCO in enumerate(pCOs): Perform dynamic solution for the current partial pressure value pCO, pO2 End = time.time() t = end-start finally: #Copy the code

The whole process is that simple, all I need to do is parallelize the double loop using the multiprocessing interface.

The time required to draw 100 points using single-core serial is as follows, taking a total of 240.76 seconds:

The 2d map looks like this:

Multi-process parallel processing

Multiprocessing module

The Multiprocessing module provides interfaces similar to threading module and encapsulates the various operations of the process. It also provides interfaces for inter-process communication such as Pipe and Queue, which can help realize inter-process communication and synchronization.

useProcessClass to dynamically create processes for parallelism

The Multiprocessing module provides Process that allows you to create a real Process to perform tasks by creating a Process object and executing the start method on that Process object. This interface is similar to the Thread class in the Threading module.

However, when the number of operated objects is not large, Process can be used to dynamically generate multiple processes. However, if the number of needed processes is too large, manually limiting the number of processes and processing the return values of different processes will become extremely tedious. Therefore, we need to use Process pool to simplify the operation.

Use process pools to manage processes

The MultiProcessing module provides a Pool class that creates Pool objects and provides methods to execute the offload task in different child processes and easily retrieve the return value. Loop parallelism, for example, is easy to implement.

For the single-instruction multi-data stream parallelism here, we can use pool.map () directly to map the function to the argument list. Pool.map is a parallel version of the map function that blocks until all processes have finished, and the order in which the results are returned remains the same.

First, I encapsulate the process for each pair of partial pressure data into a function that can be passed to the child process for execution.

Import time from multiprocessing import Pool import numpy as NP # PCOs = np.linspace(1e-5, 0.5, 10) pO2s = np.linspace(1e-5, 0.5, 10) def task(pO2): "" Accept an O2 partial pressure and perform a kinetic solution based on the current CO partial pressure." " if "__main__" == __name__: try: Start = time.time() pool = pool () Tofs = pool.map(task, pCOs) # end = time.time() t = end-start finally: Collect the result of calculation and process the drawingCopy the code

Using two cores, the calculation time is reduced from 240.76s to 148.61s, and the acceleration ratio is 1.62

The acceleration effects of different cores were tested

In order to see the improvement of program efficiency by using different core numbers, I made test plots of different core numbers and acceleration ratios, and the results are as follows:

Number of running cores and program running time:

Number of operating cores and acceleration ratio:

So, since I’ve only done the outer loop 10 times, the increase in the number of cores I’m using after 10 doesn’t really speed up the program, so I’m wasting all the extra cores.

Use Manager for simple distributed computing

The interface provided by the Multiprocessing package has been used for parallel processing of multiple core computes on a second machine, but there are more uses for Multiprocessing. We can realize simple multi-machine distributed parallel computing and distribute computing tasks to different computers.

Managers provides additional tools for multi-process communication. It provides interfaces and data objects for sharing data between multiple computers, all implemented through proxy classes such as ListProxy and DictProxy, which implement the same interfaces as native List and dict. But they can be shared over the network between processes on different computers.

About managers module interface in detail using can refer to the official document: docs.python.org/2/library/m…

All right, now we’re going to try to adapt our drawing program to be distributed and parallel across multiple computers. The main ideas of the transformation are:

  1. A computer is used as the server. This computer manages shared objects, allocates tasks and receives results through a Manager object, and then collects results for post-processing (drawing two-dimensional map).
  2. Multiple other computers can serve as clients to receive data from the server for calculation, and upload the results to the shared data for the server to collect. At the same time, the client can simultaneously carry out the above realization of multi-process parallelism to make full use of the multi-core advantages of the computer.

It can be summarized as the following figure:

Service process

First, the server needs a Manager object to manage shared objects

Def get_manager(): "create a server manager object." Jobid_queue = Queue() jobmanager.register ('get_jobid_queue', callable=lambda: Tofs = [None]*N jobManager. register('get_tofs_list', callable=lambda: Jobmanager. register('get_pCOs', callable=lambda: pCOs, proxytype=ListProxy) JobManager.register('get_pO2s', callable=lambda: JobManager(address=(ADDR, PORT), AuthKey = AuthKey) return managerCopy the code

 

  1. Basemanager.register is a class method that binds a type or callable object to a Manager object and shares it with the network so that other computers on the network can retrieve the object. For example,
JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)Copy the code

I bind a function object that returns a task queue to the manager object and share it with the network, so that the processes in the network can get the same queue through the get_jobid_queue method of their manager object, thus achieving data sharing.

2. Create manager object with two parameters.

  • Address is the IP address of the manager and the port number used to monitor the connection with the server, for example, if I am on the Intranet192.168.0.1Address of the5000Port to listen, then this parameter can be(' 192.169.0.1`, 5000)
  • Authkey, as the name suggests, is an authentication code used to verify that the client can connect to the server. This parameter must be a string object.

Assign tasks

Above we bound a task queue to the Manager object, and now I need to populate the queue so that the task can be dispatched to different clients for parallel execution.

def fill_jobid_queue(manager, nclient):
    indices = range(N)
    interval = N/nclient
    jobid_queue = manager.get_jobid_queue()
    start = 0
    for i in range(nclient):
        jobid_queue.put(indices[start: start+interval])
        start += interval
    if N % nclient > 0:
        jobid_queue.put(indices[start:])Copy the code

The so-called task here is actually the index value of the corresponding parameter in the list, so that the results of different computers can be filled into the result list according to the corresponding index, so that the server can collect the results of each computer in the shared network.

Start the server to listen

Def run_server(): # print "Start manager at {}:{}..." .format(ADDR, PORT) # Create a subprocess to start manager manager.start() # fill the task queue fill_jobid_queue(manager, NNODE) shared_job_queue = manager.get_jobid_queue() shared_tofs_list = manager.get_tofs_list() queue_size = While None in shared_tofs_list: if shared_job_queue.qsize() < queue_size: shared_job_queue.qsize() queue_size = shared_job_queue.qsize() print "Job picked..." return managerCopy the code

The task process

The server process is responsible for simple task assignment and scheduling, while the task process is only responsible for obtaining tasks and computing processing.

The basic code in the task process (client) is basically the same as the script running in the multi-core machine above (because the same function processes different data), but we also need to create a manager for the client to fetch and return tasks.

def get_manager(): class WorkManager(BaseManager): Pass # Since it is only fetched from the shared network, Workmanager.register ('get_jobid_queue') workmanager.register ('get_tofs_list') Register ('get_pCOs') workmanager. register('get_pO2s') # WorkManager(address=(ADDR, PORT), authkey=AUTHKEY) return managerCopy the code

On the client side we can still use multi-process and multi-core resources to speed up computation.

if "__main__" == __name__: manager = get_manager() print "work manager connect to {}:{}..." .format(ADDR, Shared_tofs_list = manager.get_tofs_list() # Shared_jobid_queue = manager.get_jobid_queue() pCOs = manager.get_pcos () shared_pO2s = Manager.get_po2s () # pool = pool () while 1: try: indices = shared_jobid_queue.get_nowait() pO2s = [shared_pO2s[i] for i in indices] print "Run {}".format(str(pO2s)) tofs_2d = pool.map(task, pO2s) # Update shared tofs list. for idx, tofs_1d in zip(indices, tofs_2d): Shared_tofs_list [idx] = tofs_1d # Except Queue.Empty: breakCopy the code

Next, I will conduct a simple distributed computing test on three computers in the same LAN.

  • One of them is the management node in the laboratory cluster, and the internal IP address is10.10.10.245
  • The other is a node in the cluster with 12 cores
  • The last one for their own notebook, 4 cores
  1. First run the service script on the server for task assignment and monitoring:
python server.pyCopy the code

2. Run the task scripts on both clients to obtain the tasks in the task queue and execute them

python worker.pyCopy the code

When the task queue is empty and the task is completed, the task process terminates. When the results in the result list are collected, the service process terminates.

The execution process is shown as follows:

The execution result is shown as follows:

The top panel is the server listener, the bottom left panel is the result of its own notebook, and the bottom right panel is one of the nodes in the cluster.

It can be seen that the running time is 56.86s, but it is my book that takes off its hind leg (-_-!

conclusion

In this paper, multi-core parallelism in a single machine and simple distributed parallel computing of multiple computers are realized through python built-in module Multiprocessing. Multiprocessing provides a well-packaged and friendly interface to enable Python programs to use multi-core resources to speed up their own computing programs. Hopefully, it will help those who use Python to implement parallel speech.

reference

  • Docs.python.org/2/library/m…
  • Distributed process – official website of Liao Xuefeng