When Python is used to process tasks, the ability of a single thread is limited. Therefore, tasks need to be parallelized and executed by multiple threads or processes.

Concurrent. futures is one such library that makes it very easy for users to parallelize tasks. The name is a bit long, so I’m going to use concurrent instead of concurrent.futures.

Concurrent provides two concurrent models, one is multi-threaded ThreadPoolExecutor and the other is multi-process ProcessPoolExecutor. A multithreaded model is appropriate for IO intensive tasks. The multi-process model should be used for computationally intensive tasks.

Why do you choose this way? The Python GIL prevents Python virtual machines from using multiple cores effectively. For pure computing tasks, it can never drain more than a single CPU core. To break through this bottleneck, multiple child processes must be fork out to share the computation. For IO intensive tasks, CPU utilization is often very low, although the use of multi-threading will double THE CPU utilization, but is far from saturation (100%), in the single core can cope with the overall computing premise, it is natural to choose the mode of less resource occupation, that is, multi-threading mode.

Next, let’s try both modes for parallel computation.

multithreading

# coding: utf8
# t.py

import time
import fire
import threading
from concurrent.futures import ThreadPoolExecutor, wait


# Split subtasks
def each_task(index):
    time.sleep(1)  # Sleep for 1s, simulate IO
    print "thread %s square %d" % (threading.current_thread().ident, index)
    return index * index  # return result


def run(thread_num, task_num):
    Thread_num specifies the number of threads in the pool
    executor = ThreadPoolExecutor(thread_num)
    start = time.time()
    fs = []  # list of future
    for i in range(task_num):
        fs.append(executor.submit(each_task, i))  # Submit task
    wait(fs)  # Wait for the calculation to finish
    end = time.time()
    duration = end - start
    s = sum([f.result() for f in fs])  # sum
    print "total result=%s cost: %.2fs" % (s, duration)
    executor.shutdown()  Destroy the thread pool


if __name__ == '__main__':
    fire.Fire(run)
Copy the code

Run python t.py 2 10, that is, 2 threads running 10 tasks, and observe the output

thread 123145422131200 square 0thread 123145426337792 square 1

thread 123145426337792 square 2
 thread 123145422131200 square 3
thread 123145426337792 square 4
thread 123145422131200 square 5
thread 123145426337792 square 6
thread 123145422131200 square 7
thread 123145426337792 square 8
thread 123145422131200 square 9
total result=285 cost: 5.02s
Copy the code

So we see that it took us about 5s to compute, so we have a total of 10s of sleep shared by two threads, so it’s 5s. The reader may ask why the output is messed up because the print operation is not atomic. It is composed of two consecutive write operations. The first write outputs content and the second write outputs newlines. So the output is not neat. If you modify the code a little bit and change print to a single write operation, the output is neat (there is a further debate on whether write is absolutely atomic)

# Split subtasks
def each_task(index):
    time.sleep(1)  # Sleep for 1s, simulate IO
    import sys
    sys.stdout.write("thread %s square %d\n" % (threading.current_thread().ident, index))
    return index * index  # return result
Copy the code

Let’s run python t.py 2 10 again and watch the output

thread 123145438244864 square 0
thread 123145442451456 square 1
thread 123145442451456 square 2
thread 123145438244864 square 3
thread 123145438244864 square 4
thread 123145442451456 square 5
thread 123145438244864 square 6
thread 123145442451456 square 7
thread 123145442451456 square 9
thread 123145438244864 square 8
total result=285 cost: 5.02s
Copy the code

Next, we change the parameters to scale up to 10 threads to see how long it takes all the tasks to complete

> python t.py 10 10 thread 123145327464448 square 0 thread 123145335877632 square 2 thread 123145331671040 square 1 thread 123145344290816 square 4 thread 123145340084224 square 3 thread 123145348497408 square 5 thread 123145352704000 square 6 thread 123145356910592 square 7 thread 123145365323776 square 9 thread 123145361117184 square 8 total Result = 285 cost: 1.01 sCopy the code

And you can see that all the tasks are done in 1s. This is the beauty of multithreading, which can parallelize multiple IO operations and reduce the overall processing time.

Multiple processes

While multithreading is better suited for IO intensive tasks, multiprocessing is better suited for computation intensive tasks. Next we’ll simulate computationally intensive tasks. My PC has two cores, allowing me to experience the benefits of multi-core computing.

So how do we simulate this intensive computing task? We can use the PI formula.

By enlarging the length of the series, n, you can approach PI infinitely. When n is very large, the computation is slow and the CPU is busy all the time, which is what we expect.

Ok, now let’s write the multi-process parallel computation code

# coding: utf8
# p.py

import os
import sys
import math
import time
import fire
from concurrent.futures import ProcessPoolExecutor, wait


# Split subtasks
def each_task(n):
    Calculate PI according to the formulaS = 0.0for i inRange (n) : s + = 1.0 / (I + 1)/(I + 1) = PI math.h SQRT (6 * s)# os.getpid can get the child process number
    sys.stdout.write("process %s n=%d pi=%s\n" % (os.getpid(), n, pi))
    return pi


def run(process_num, *ns):  Enter multiple n values and divide them into multiple subtasks to calculate the result
    Process_num specifies the number of processes
    executor = ProcessPoolExecutor(process_num)
    start = time.time()
    fs = []  # list of future
    for n in ns:
        fs.append(executor.submit(each_task, int(n)))  # Submit task
    wait(fs)  # Wait for the calculation to finish
    end = time.time()
    duration = end - start
    print "total cost: %.2fs" % duration
    executor.shutdown()  Destroy the process pool


if __name__ == '__main__':
    fire.Fire(run)
Copy the code

As you can see from the code, multi-process mode is not much different from multi-thread code, just change the name of the class, everything else is the same. This is part of the appeal of the Concurrent library, which abstracts the multithreaded and multiprocess models out of the same usage interface.

Next we run Python p.py 1 5000000 5001000 5002000 5003000 to compute PI 4 times in one process. Observe the output

Process 96354 N =5000000 PI =3.1415924626 Process 96354 N =5001000 PI =3.14159246264 Process 96354 N =5002000 PI =3.14159246268 Process 96354 N =5003000 PI =3.14159246272 Total cost: 9.45sCopy the code

You can see that as n increases, the result gets closer and closer to PI, and since only one process is used, the task is executed sequentially, taking about 9.5 seconds.

Then add another process and observe the output

> Python p.py 2 5000000 5001000 5002000 5003000 process 96529 N =5001000 PI =3.14159246264 Process 96530 n=5000000 PI =3.1415924626 Process 96529 N =5002000 PI =3.14159246268 Process 96530 N =5003000 PI =3.14159246272 Total cost: 4.98sCopy the code

From the point of view of time, it has shortened nearly half, indicating that the multi-process has indeed achieved the effect of computing parallelization. At this point, if you look at the CPU usage of the process using the top command, both processes are running close to 100%.

If we add two more processes, we can still compress the computation time

> python p.py 4 5000000 5001000 5002000 5003000 process 96864 n=5002000 PI =3.14159246268 Process 96862 n=5000000 PI =3.1415924626 Process 96863 N =5001000 PI =3.14159246264 Process 96865 N =5003000 PI =3.14159246272 Total cost: 4.86sCopy the code

It seemed that the time savings could not continue, because with only two computing cores, two processes were enough to drain them, and even with more processes, only two computing cores were available.

In-depth principle

It is very simple when used concurrently, but the internal implementation is not well understood. Before delving into the internal structure, we need to understand the Future object. In the previous example, each executor submits a task and returns a Future object, which represents a pit of results. The pit is empty at the time the task is submitted, and once the child thread finishes running the task, it pushes the results into the pit, which the main thread can retrieve from the Future object. To put it more simply, the Future object is the medium through which the main thread and child threads communicate.

class Future(object):

    def __init__(self):
        self._condition = threading.Condition()  # Conditional variable
        self._result = None
    
    def result(self, timeout=None):
        self._condition.wait(timeout)
        return self._result
        
    def set_result(self, result):
        self._result = result
        self._condition.notify_all()
Copy the code

The main thread stuffs the task into the thread pool and gets the Future object, which still has an empty _result inside. If the main thread calls the result() method to get the result, it blocks on the condition variable. If the child thread completes its calculation, it immediately calls set_result() to populate the future object with the result and wake up the thread blocking on the condition variable, the main thread. The main thread immediately wakes up and returns the result normally.

Internal structure of the thread pool

The interaction between the main thread and child thread is divided into two parts, the first part is how the main thread passes the task to the child thread, and the second part is how the child thread passes the result to the main thread. We’ve already seen in Part 2 that this is done through Future objects. So how does the first part work?

As shown above, the secret lies in the queue, through which the main thread passes tasks to multiple child threads. Once the main thread pushes the task into the queue, the child threads begin to scramble, and only one thread can grab the task, execute it immediately, and then put the result into the Future object to complete the execution of the task.

Disadvantages of thread pools

A major design problem with concurrent’s thread pool is that the task queue is unbounded. If queue producer tasks are produced too quickly and thread pool consumption is too slow to handle, tasks pile up. If the stack continues, the memory will continue to grow until OOM, and all the tasks in the queue will be completely lost. Users must pay attention to this when using, and do a good job of appropriate control.

Process pool internal structure

The internal structure of the process pool is so complicated that even the author of concurent felt it was very complicated, so he drew an ASCII diagram in the code to explain the internal structure of the model

I think this picture is not easy enough to understand, so I have drawn a separate picture, please carefully combine the above two pictures, together to go through a complete task processing process.

  1. The main thread stuffs the task into a TaskQueue(a normal memory queue) and gets the Future object
  2. The only administrative thread gets the task from the TaskQueue and stuffs it into the CallQueue.
  3. Child processes vie for tasks from the CallQueue for processing
  4. Child processes stuff results into a ResultQueue(distributed cross-process queue)
  5. The management thread fetches the result from the ResultQueue and stuffs it with a Future object
  6. The main thread gets the result from the Future object

There are three queues involved in this complex process, with additional administrative threads in between. Then why does the author want to design so complex, what are the benefits of such a design?

First, on the left side of the diagram, there is not much difference from the thread pool process, except that there is only one managing thread, whereas the thread pool can have multiple child threads. This allows the multi-process model to be used in the same way as the multi-thread model, which is why there is no difference between the two models — the administrative thread in the middle hides the multi-process interaction logic behind them.

Then, on the right side of the diagram, the management thread interacts with child processes through two queues, both of which are multiprocessing.queue. A CallQueue is a single producer with multiple consumers, whereas a ResultQueue is a single producer with multiple consumers.

CallQueue is a bounded queue whose upper limit is written in the code as “number of child processes +1”. If the child processes can’t handle it, the CallQueue becomes full and the management thread stops filling it with data. TaskQueue is an unbounded queue, its content can continue to grow indefinitely regardless of whether the consumer is continuously consuming (the management thread), so it will eventually result in OOM.

Cross-process queue

The cross-process Queue in the process pool model is implemented with multiprocessing.queue. So what are the internal details of this cross-process queue, and what technology is used to implement it

Queue uses an unnamed socketpair to communicate across processes. The difference between socketpairs and sockets is that they do not need ports or network protocol stacks. Direct cross-process communication through the kernel’s socket read/write buffer.

When a parent process passes a task to a child process, pickle is used to serialize the task object into a byte array, which is then written to the kernel’s buffer using socketPair’s write descriptor. The child process can then read the byte array from the buffer and use pickle to deserialize the byte array to get the task object so that it can finally execute the task. The same process passes the result to the parent process through the same process, except that the SocketPair is an unnamed socket created inside the ResultQueue.

Multiprocessing. Queue supports duplex communication, where data flows can be parent to child or child to parent, but in Concurrent’s pooled implementation only simplex communication is used. CallQueue is from parent to child, ResultQueue is from child to parent.

conclusion

The concurrent.futures framework is very useful, although the internal implementation mechanism is extremely complex and the reader can use it without fully understanding the internal details. However, it is important to note that the task queues inside both thread pools and process pools are unbounded, so it must be avoided that memory continues to climb without consumer processing.

Today, the author’s new book “In-depth Understanding of RPC” officially launched, limited time discount 9.9 yuan, interested readers click the link below to read

In-depth understanding of RPC: Build your own distributed, highly concurrent RPC services based on Python