An overview of the

Traditional multithreaded schemes use a “create now, destroy now” strategy. Although the time to create a thread is much shorter than the time to create a process, if the task submitted to a thread is executed very frequently and for short periods of time, the server will be in a state of constant thread creation and thread destruction.

The running time of a thread can be divided into three parts: the start time of the thread, the running time of the thread body, and the destruction time of the thread. In the case of multithreading, if threads cannot be reused, it means that each creation needs to go through three processes: start, destroy, and run. This inevitably increases the corresponding system time and reduces efficiency.

Use thread pools: Because threads are created in advance and put into a thread pool, and are not destroyed after processing the current task, they are scheduled to work on the next task, it is possible to avoid multiple thread creation, thus saving the overhead of thread creation and destruction, resulting in better performance and system stability.

Thread pool model

This is done by creating instances of Thread(), and again by using classes that inherit from threading.thread ()

Queue = queue () def do_job(): while True: i = queue.get() time.sleep(1) print 'index %s, curent: %s' % (I, threading.current_thread()) queue.task_done() if __name__ == '__main__': # create a pool of 3 threads for I in range(3): T = Thread(target=do_job) t.daemon=True Time.sleep (3) for I in range(10): queue.put(I) queue.join()Copy the code
  • Daemon description: If the daemon property of a child thread is False, the main thread checks whether the child thread is finished. If the child thread is still running, the main thread waits for it to finish before exiting. If a child thread has a daemon attribute of True, the main thread exits without checking it, and all child daemons with a value of True end with the main thread, whether or not it has finished. Daemon =True means that a thread is a daemon thread. There is no way to trigger its exit from outside the daemon thread, so the main thread exits and its children follow
  • Queue.task_done () : queue.join() blocks the queue until it is finished, but how do we let the queue know that it is finished? Queue.get () lets the main program know that the queue is finished, but not that all the tasks in the queue are finished, so the program needs to call queue.task_done() to tell the main program that another task is finished until all the tasks are finished and the main program exits

The output

index 1, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 0, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 2, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 4, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 3, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 5, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 6, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 7, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 8, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 9, curent: <Thread(Thread-1, started daemon 139652189157120)>
finishCopy the code

You can see that all tasks are completed in these threads. Thread-(1-3)

Thread Pool Principle

Basic principle of thread pool: we queue tasks, then open N threads, each thread to fetch a task from the queue, and then tell the system that I am finished, and then go to fetch the next task from the queue until all tasks in the queue are empty and exit the thread.

The example above generates a pool of three threads, each of which blocks in an infinite loop the tasks that read the Queue and all of which are handled only by the three pre-generated threads.

The specific work description is as follows:

  1. Create an instance of queue.queue () and populate it with data or tasks
  2. Create a daemon thread pool and set the thread to daemon daemon threads
  3. Each thread blocks in an infinite loop reading the item from the queue and processing it
  4. After each task is completed, use the queue.task_done() function to send a signal to the queue that the task has completed
  5. The main thread blocks queue.join() until the task queue is empty, unblock and proceed down

There are a few caveats to this pattern:

  • Setting the thread pool as a daemon means that daemons exit automatically when the main thread exits. With default daemons =False, non-daemons block the main thread exit, so the pool blocks an infinite loop of waiting tasks even after the queue has finished. So that the host thread does not exit.
  • When queue.join() is used, the main thread blocks until the queue is empty. How does the main thread know that the queue is empty? Queue. Task_done () is sent each time queue.get() and the task is processed, and the queue is decrement by 1 until the queue is empty. Queue.join () unblocks and executes down.
  • This mode is dominated by tasks in the queue and then exits. Since the thread pool is daemon, all threads in the main exit thread pool exit. Instead of blocking with queue-led thread.join(), which we might normally do, this thread blocks the main thread before it completes. Use queue.join() if you want to finish a certain number of tasks on a queue, such as crawling a specified number of pages. If you want to finish a task on a thread, use thread.join().

Example: Write to a Web server using a thread pool

import socket import threading from threading import Thread import threading import sys import time import random from Queue import Queue host = '' port = 8888 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, Class ThreadPoolManger(): """ thread pool manager """ def __init__(self, thread_num): Self.work_queue = Queue() self.thread_num = thread_num self.__init_threading_pool(self.thread_num) def __init_threading_pool(self, thread_num): # create thread pool for I in range(thread_num): thread = ThreadManger(self.work_queue) thread.start() def add_job(self, func, *args): Self.work_queue. Put ((func, args)) class ThreadManger(Thread): Def __init__(self, work_queue): Thread.__init__(self) self.work_queue = work_queue self.daemon = True def run(self): target, Args = self.work_queue.get() target(*args) self.work_queue.task_done() 200 hello world def handle_REQUEST (conn_socket) Recv_data = conn_socket.recv(1024) reply = 'HTTP/1.1 200 OK \r\n\r\n' reply += 'hello world' print 'thread %s is running '% threading.current_thread().name conn_socket.send(reply) conn_socket.close() # loop waiting to receive client requests while True: Addr = s.acept () # if there is a request, throw the socket to handle_request. Thread_pool.add_job (handle_REQUEST, *(conn_socket,)) s.close()Copy the code
/data/web/advance_python/socket]$python sock_s_threading_pool [master][/data/web/advance_python/socket]$ ps -eLf|grep sock_s_threading_pool lisa+ 27488 23705 27488 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py lisa+ 27488 23705 27489 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py lisa+ 27488 23705 27490 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py lisa+ 27488 23705 27491 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py lisa+ 27488 23705 27492 0 5 23:22 pts/30 00:00:00 Python sock_s_threading_pool.py # As expected there are five threads, one main thread, and four thread pool threadsCopy the code

The thread pool Web server authoring framework consists of the following components and steps:

  • Define thread pool manager ThreadPoolManger to create and manage thread pools, provide add_job() interface, and add tasks to thread pools
  • Define a worker thread ThreadManger, define the run() method, which is responsible for iterating the work queue indefinitely and completing the queue tasks
  • Define socket listening for requests S.acept () and handling requests handle_requests() tasks.
  • Initialize a pool of four threads that block tasks waiting to read the queue
  • When socket.accept() has a request, conn is sentSocket as argument, handleThe request method is thrown to the thread pool and waits for the pool to allocate threads for processing

GIL’s impact on multithreading

Because Python threads are real threads, but the interpreter executes code with a GIL lock: The Global Interpreter Lock, which must be acquired by any Python thread before it executes, is then automatically released by the Interpreter for every 100 bytes of bytecode, giving other threads a chance to execute. The GIL global lock actually locks all thread execution code, so multithreading can only be executed alternately in Python, even if 100 threads are running on a 100 core CPU, only one core is used.

This is collaborative multitasking. When a task such as network I/O is started, and there is no need to run any Python code for a long or indefinite period of time, a thread will yield the GIL, So other threads can get the GIL and run Python. This polite behavior is called collaborative multitasking, and it allows concurrency; Multiple threads are waiting for different events simultaneously.

Only one of the two threads can execute Python at a time, but once the thread starts connecting, it drops the GIL so the other threads can run. This means that two threads can wait concurrently for a socket connection, which is a good thing. They can do more work in the same amount of time.

What is the thread pool set to?

The number of server CPU cores is limited, the number of concurrent threads is limited, not the more open the better, and thread switching is expensive, if the thread switching is too frequent, but will degrade the performance

During thread execution, the computation time is divided into two parts:

  • CPU calculation, occupy CPU
  • Operations such as recv(), Accept (), and sleep() wait for I/O returns. Specific operations include accessing cache, calling downstream services through RPC, and accessing DB, which require network calls

If the calculation time is 50% and the wait time is 50%, then in order to maximize utilization, two threads can be opened: If the working time is 2 seconds, and the CPU completes the calculation for 1 second, the thread needs 1 second to wait for the I/O, then the CPU is free, then it can switch to another thread, let the CPU work for 1 second, the thread needs 1 second to wait for the I/O, then the CPU can switch back, and the first thread has just completed the 1 second I/O wait. You can let the CPU continue to work, so that you can cycle between two threads before switching operations.

So if the computation time is occupied 20%, waiting time by 80%, so in order to reached the highest utilization can drive 5 thread: can imagine takes 5 seconds to complete the task, CPU 1 second, waiting time for 4 seconds, when the CPU thread waiting for, can then activate the four threads at the same time, so the CPU and I/o waiting time, maximum overlap

Abstract: The formula for calculating the number of threads is as follows: for N core servers, if the local computation time is x and the waiting time is Y, the number of worker threads (thread pool threads) is set to N*(x+y)/x to maximize THE CPU utilization. Python can only use 1 core due to GIL, so set N=1 here

About me

If the article has harvest for you, you can collect and forward, which will give me a great encouragement yo! In addition, you can pay attention to my public number [code nongfuge] (COder2025), I will continue to output the original algorithm, computer based articles!