Previously, in the process of learning Python, I came into contact with the knowledge points related to multi-threaded programming, which I did not thoroughly understand before. Take some time to spin the cup today and sort out the details as best you can.
The difference between thread and process
Process and thread are the basic concepts of operating system, but they are abstract and not easy to master. One of the classic sayings in the textbook about multi-process and multi-threading is that “a process is the smallest unit of resource allocation and a thread is the smallest unit of CPU scheduling”. A thread is a single sequence control flow in a program. A relatively independent, schedulable unit of execution within a process, which is the basic unit of system independent scheduling and CPU dispatch. The simultaneous running of multiple threads in a single program to accomplish different tasks is called multithreading.
Process and thread
Processes are the basic unit of resource allocation. All resources associated with the process are recorded in the process control block PCB. To indicate that the process owns these resources or is using them. In addition, the process is also the scheduling unit of the preemption processor, and it has a complete virtual address space. When a process is scheduled, different processes have different virtual address Spaces, and different threads within the same process share the same address space.
Threads, in contrast, are independent of resource allocation; they belong to a process and share its resources with other threads within the process. Threads consist only of the relevant stack (system stack or user stack) registers and the thread control table TCB. Registers can be used to store local variables in a thread, but not related variables in other threads.
Typically, there can be several threads in a process that can take advantage of the resources owned by the process. In operating systems that introduce threads, it is common to regard processes as the basic unit of resource allocation, and threads as the basic unit of independent operation and independent scheduling. Because the thread is smaller than the process, basically does not have the system resources, so the cost of its scheduling will be much smaller, can more efficiently improve the degree of concurrent execution between multiple programs in the system, so as to significantly improve the utilization rate of system resources and throughput. Therefore, threads have been introduced into the general operating system in recent years in order to further improve the concurrency of the system, and it is regarded as an important indicator of modern operating system.
The differences between threads and processes can be summarized in the following four points:
- Address space and other resources (such as open files) : processes are independent of each other and are shared by threads of the same process. Threads in one process are not visible to other processes.
- Communication: Inter-process communication IPC, threads can directly read and write process data segments (such as global variables) to communicate — need the assistance of process synchronization and mutual exclusion means to ensure data consistency.
- Scheduling and switching: Thread context switching is much faster than process context switching.
- In a multithreaded OS, a process is not an executable entity.
Multiprocess versus multithreading
Compare the dimensions | Multiple processes | multithreading | conclusion |
Data sharing and synchronization | Data sharing is complex and synchronization is simple | Data sharing is simple and synchronization is complex | Each have advantages and disadvantages |
Memory, CPU, | Large memory usage, complex switchover, low CPU usage | Low memory usage, simple switchover, and high CPU utilization | Thread dominant |
Create, destroy, switch | Complex and slow | Simple, fast | Thread dominant |
Programming and debugging | Simple programming and debugging | Programming is complicated, debugging is complicated | Process is dominant |
reliability | Processes do not affect each other | The failure of one thread causes the failure of the entire process | Process is dominant |
distributed | Suitable for multi-core, multi-machine, simple to extend to multiple machines | Suitable for multiple cores | Process is dominant |
In summary, processes and threads can also be compared to trains and carriages:
- Threads move under a process (pure cars cannot run)
- A process can contain more than one thread (a train can have more than one car)
- It is difficult to share data between different processes (passengers on one train cannot easily move to another, such as station transfers)
- Data is easily shared between different threads in the same process (it is easy to switch from car A to car B)
- Processes consume more computer resources than threads (using more trains than cars)
- Processes do not interact with each other, the failure of one thread will result in the failure of the entire process (one train will not affect another train, but if the middle car on one train catches fire, it will affect all cars on that train).
- The process can be extended to multiple machines, and the process is suitable for multiple cores at most (different trains can run on multiple tracks, and cars of the same train cannot run on different tracks).
- The memory address used by a process can be locked, meaning that when a thread uses some shared memory, other threads must wait for it to terminate before they can use it. (train bathroom) – “Mutex”
- The memory address used by the process can be limited to the amount of memory used (e.g., restaurant on train, how many people are allowed to enter, if full, you have to wait at the door until someone comes out) – “Semaphore”
The Python global interpreter lock GIL
The Global Interpreter Lock (English: Global Interpreter Lock, abbreviated GIL) is not a Python feature, but was introduced in the implementation of the Python parser (CPython). Since CPython is the default Python execution environment for most environments. Therefore, in many people’s concept of CPython as Python, GIL is automatically attributed to the defects of the Python language. What about the GIL in the CPython implementation? Here’s the official explanation:
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.
The execution of Python code is controlled by the Python virtual machine (also known as the interpreter main loop, the CPython version). Python was designed with only one thread running in the interpreter’s main loop at a time, meaning that only one thread is running in the interpreter at any one time. Access to the Python virtual machine is controlled by the global interpreter lock (GIL), which ensures that only one thread is running at a time.
What is GIL good for? In short, it is faster in single-threaded situations and more convenient when combined with the C library without worrying about thread safety, which was the most common scenario and advantage of early Python. In addition, the design of the GIL simplifies CPython implementation so that the object model, including key built-in types such as dictionaries, is implicitly accessible concurrently. Locking the global interpreter makes it easier to implement multithreading support, but it also loses the parallel computing power of multiprocessor hosts.
In a multithreaded environment, the Python virtual machine executes as follows:
- Set the GIL
- Switch to a thread to run
- Run until a specified number of bytecode instructions are executed, or the thread voluntarily cedes control (sleep(0) can be called)
- Set the thread to sleep
- Unlock the GIL
- Repeat all of these steps again
Before Python3.2, the release logic for GIL was that the current thread encountered an IO operation or that the ticks count reached 100. This count can be adjusted by sys. setCheckInterval). Because compute-intensive thread after the release of GIL and will immediately to apply for GIL, and often in the other thread finished schedule before it has to get to GIL, will lead to once won GIL compute-intensive thread, it in a very long period of time will occupy the GIL, even till the end of the thread.
Python 3.2 starts using the new GIL. The new GIL implementation uses a fixed timeout to instruct the current thread to abandon the global lock. When the current thread holds the lock and another thread requests it, the current thread is forced to release the lock after 5 milliseconds. This improvement improves the GIL occupation of a single thread for a long time in the case of single core.
On a single-core CPU, it takes hundreds of interval checks to cause a single thread switch. Thrashing is severe on multi-core cpus. Each time the GIL lock is released, threads compete for the lock and switch threads, consuming resources. Each time CPU0 releases the GIL, the awakened thread can acquire the GIL lock, so the execution can be seamless. However, in multi-core mode, after CPU0 releases the GIL, threads on other cpus will compete, but the GIL may be immediately acquired by CPU0 again. This causes a thread on several other cpus to wake up and wait until the switch time and then enter the unscheduled state, which can result in thread thrashing, which can be even less efficient.
In addition, it can be inferred from the implementation mechanism above that Python multithreading is more friendly to IO intensive code than CPU intensive code.
Countermeasures against GIL:
- Using higher versions of Python (GIL mechanism optimized)
- Replace multiple threads with multiple processes (there is no GIL between multiple processes, but the process itself consumes more resources)
- Specify CPU running threads (using the Affinity module)
- Use Jython, IronPython, and other non-gil interpreters
- Use multithreading only for full IO intensive tasks
- Use coroutines (efficient single-threaded mode, also known as microthreading; Usually used with multiple processes)
- Key components are written in C/C++ as Python extensions, using ctypes to make Python programs directly call the export functions of dynamically linked libraries compiled in C. (With nogil call out GIL limit)
Python’s multiprocess package multiprocessing
Python’s threading package mainly uses multi-threading, but because of GIL, multi-threading in Python is not really multi-threading. In most cases, multi-processing is required to fully utilize multi-core CPU resources. The Multiprocessing package was introduced in Python 2.6, which replicated the interfaces provided by Threading for easy migration. The only difference is that it uses multiple processes instead of multithreading. Each process has its own independent GIL, so there is no GIL contention between processes.
With multiprocessing, you can easily convert from a single process to concurrent execution. Multiprocessing supports child processes, communication and sharing of data, various forms of synchronization, and provides components such as Process, Queue, Pipe, and Lock.
Background of Multiprocessing
In addition to dealing with Python’s GIL, another reason for multiprocessing is inconsistency between Windows operating systems and Linux/Unix systems.
The Unix/Linux operating system provides a fork() system call that is very special. A normal function is called once and returns once, but fork() is called once and returns twice, because the operating system automatically makes a copy of the current process (parent process) and returns it in both parent and child processes. The child process always returns 0, and the parent process returns the ID of the child. The reason for this is that a parent can fork many children, so the parent remembers the ID of each child, and the child can get the parent’s ID simply by calling getpid().
Python’s OS module encapsulates common system calls, including fork, which makes it easy to create child processes in Python programs:
import os print('Process (%s) start... ' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))Copy the code
The result of the above code execution on Linux, Unix, and Mac is:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.Copy the code
With fork, when a process receives a new task, it can replicate a child process to handle the new task. A common Apache server has the parent process listening on the port and forking out the child process to handle the new HTTP request whenever it comes in.
Because Windows does not have a fork call, the above code will not run on Windows. Since Python is cross-platform, it is natural to provide cross-platform multi-process support as well. The Multiprocessing module is a cross-platform version of the multiprocess module. The multiprocessing module encapsulates the fork() call so that we don’t have to worry about the details of fork(). Because Windows does not have fork calls, MultiProcessing needs to “simulate” the effect of a fork.
Multiprocessing Common components and functions
Create a management process module:
- Process (for creating a Process)
- Pool (for creating a management process Pool)
- Queue (for process communication, resource sharing)
- Value, Array (for process communication, resource sharing)
- Pipe (for Pipe communication)
- Manager (for resource sharing)
Simultaneous process module:
- Condition (Condition variable)
- An Event
- Lock (mutex)
- RLock (reentrant mutex that can be acquired multiple times by the same process without blocking)
- Semaphore
Let’s take a look at how each component and function is used.
Process (for creating a Process)
The MultiProcessing module provides a Process class to represent a Process object.
In MultiProcessing, each Process is represented by a Process class.
Constructor: Process([group [, target [, name [, args [, kwargs]]]]])
- Group: grouping, which is not actually used and is always None
- Target: represents the calling object, the task to be performed by the child process. You can pass in the method name
- Name: Sets the name of the child process
- Args: Positional argument to be passed to target as a tuple.
- Kwargs: The dictionary argument to be passed to the target function as a dictionary.
Instance methods:
- Start () : Starts the process and calls p.run() in the child process
- Run () : the method that the process runs at startup to call the function specified by target. This method must be implemented in our custom class
- Terminate () : forcibly terminates process P without any cleanup. If p creates a child, the child becomes a zombie. Use this method with special care. If P also holds a lock, it will not be released, resulting in a deadlock
- Is_alive () : returns whether the process is running. Returns True if p is still running
- Join ([timeout]) : Process synchronization. The main process waits for the child process to complete before executing the following code. The thread waits for P to terminate. Timeout is the optional timeout period (after which the parent thread will not wait for the child thread to continue executing). It is important to note that p.join can only join processes started by start, not processes started by run
Properties:
- Daemon: The default value is False. If set to True, p is the daemon running in the background. When p’s parent process terminates, p terminates with it, and when True, p cannot create its own process. Must be set before p.start()
- Name: indicates the process name
- Pid: indicates the PID of a process
- Exitcode: The process is None at run time, if -n, it is terminated by signal N.
- Authkey: The process’s authentication key, which defaults to a random 32-character string generated by os.urandom(). The purpose of this key is to provide security for low-level interprocess communication involving network connections, which can only succeed if they have the same authentication key.
Example use: (Note: on Windows Process() must be placed under if __name__ == ‘__main__’ 🙂
from multiprocessing import Process import os def run_proc(name): print('Run child process %s (%s)... ' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.')Copy the code
Pool (for creating a management process Pool)
The Pool class is used when there are many targets that need to be executed and it is too cumbersome to manually limit the number of processes. The Process class can be used when there are few targets and you don’t need to control the number of processes. The Pool can provide a specified number of processes for the user to call. When a new request is submitted to the Pool, if the Pool is not full, a new process is created to execute the request. However, if the number of processes in the pool has reached the specified maximum, the request will wait until any process in the pool terminates, and the process in the pool will be reused.
Constructor: Pool([processes[, Initializer [, initargs[, maxTasksperchild [, context]]]]])
- Processes: The number of processes to be created. If omitted, the number returned by CPU_count () is used by default.
- Initializer: Callable to be executed when each worker process starts. Default is None. If initializer is None, then initializer(*initargs) is called at the start of each worker process.
- Initargs: is the parameter group to be passed to initializer.
- Maxtasksperchild: The number of tasks that can be completed before the worker process exits and then replaced by a new worker process to free up idle resources. Maxtasksperchild defaults to None, meaning that the Pool will remain alive as long as a worker process exists.
- Context: Used to specify the context in which the worker process is started, usually a Pool is created using either Pool() or the Pool() method of a context object, both of which set the context appropriately.
Instance methods:
- Apply (func[, args[, kwargs]]) : Perform func(args,*kwargs) in a pool worker process and return the result. It is important to note that this operation does not execute func functions in all pool worker processes. To execute the func function concurrently with different arguments, you must either call the p.apply() function from a different thread or use p.apply_async(). It’s blocked. Apply rarely
- Apply_async (func[, arg[, KWDS ={}[, callback=None]]]) : Executes func(args,*kwargs) in a pool worker process and returns the result. The result of this method is an instance of the AsyncResult class, and the callback is a callable object that receives input parameters. When the func result becomes available, the understanding is passed to the callback. Callback disallows any blocking operations that would otherwise receive results from other asynchronous operations. It’s non-blocking.
- Map (func, iterable[, chunksize=None]) : a map method in the Pool class that blocks the process until a result is returned, basically the same behavior as the built-in map function. Note that although the second argument is an iterator, in practice the program will not run the child process until the entire queue is ready.
- Map_async (func, iterable[, chunksize=None]) : map_async and map apply and apply_async
- Imap () : The difference between imap and Map is that map immediately returns an iterable when all processes have finished executing and the result is returned.
- Imap_unordered () : The order of results returned is not guaranteed to be the same as the order in which processes were added.
- Close () : Closes the process pool to prevent further operations. If all operations remain suspended, they will complete before the worker process terminates.
- Join () : waits for all worker processes to exit. This method can only be called after close() or teminate() so that it no longer accepts a new Process.
- Terminate () : Terminates the work process and no more outstanding tasks are processed.
The return value of the methods apply_async() and map_async() is obj, an instance of AsyncResul. The instance has the following methods:
- Get () : Returns the result and waits for it to arrive if necessary. Timeout is optional. If it has not arrived within the specified time, an exception is thrown. If an exception is thrown in a remote operation, it will be thrown again when this method is called.
- Ready () : Returns True if the call is complete
- Successful () : Return True if the call completes and no exception is raised, or raise an exception if this method is called before the result is ready
- Wait ([timeout]) : Waits for the result to become available.
- Terminate () : Immediately terminates all worker processes without performing any cleanup or terminating any pending work. This function is automatically called if p is garbage collected
Example:
# -*- coding:utf-8 -*-
# Pool+map
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
lists = range(100)
pool = Pool(8)
pool.map(test, lists)
pool.close()
pool.join()Copy the code
# -*- coding: UTF-8 -*- # Async process Pool (non-blocking) from multiprocessing import Pool def test(I): print(I) if __name__ == "__main__": Pool = pool (8) for I in range(100): "" for loop: (1) add 100 child processes to the pool (relative parent process blocks) (2) execute 8 child processes at a time, wait for one child process to complete, immediately start a new child process. Apply_async is a pool of asynchronous processes. Asynchrony refers to the process of starting the child process asynchronously with the execution (print) of the parent process itself, whereas the process of adding the child process to the process pool in the For loop is synchronous with the execution of the parent process itself. Pool.apply_async (test, args=(I,)) print("test") pool.close() pool.join()Copy the code
# -*- coding: UTF-8 -*- # Async process Pool (non-blocking) from multiprocessing import Pool def test(I): print(I) if __name__ == "__main__": pool = Pool(8) for i in range(100): (2) Execute the child process, wait for the child process to complete the execution, put another child process into the process pool, and then execute. When the for loop is complete, print is executed. Pool.apply (test, args=(I,)) print("test") pool.close() pool.join()Copy the code
Queue (for process communication, resource sharing)
When working with multiple processes, it is best not to use shared resources. Common global variables cannot be shared by a process, only data structures constructed by the Multiprocessing component can be shared.
Queue is a class used to create queues that share resources between processes. Queue can be used to transfer data between processes (disadvantages: only applicable to the Process class, not in the Pool Process Pool).
Queue([maxsize])
- Maxsize is the maximum number of items allowed in the queue. If omitted, there is no size limit.
Instance methods:
- Put () : Inserts data into the queue. The PUT method also has two optional arguments: Blocked and timeout. If Blocked is True (the default) and timeout is positive, this method blocks for the time specified by timeout until the queue has space left. If a timeout occurs, Queue.Full is raised. If Blocked is False, but the Queue is Full, the queue.full exception is immediately raised.
- Get () : An element can be read and deleted from the queue. The get method has two optional arguments: blocked and timeout. If Blocked is True (the default) and timeout is positive, no elements are fetched during the wait time, and queue.Empty is raised. If Blocked is False, there are two cases: if a value is available in the Queue, it is returned immediately; otherwise, if the Queue is Empty, the queue.empty exception is raised immediately. If you do not want to throw an exception when empty, make Blocked True or empty all parameters.
- With q.g et (False) get_nowait () :
- With q.p ut (False) put_nowait () :
- Empty () : Returns True if q is empty when this method is called, which is unreliable, for example if an item is added to the queue in the process of returning True.
- Full () : Returns True if q is full when this method is called, which is unreliable, such as if items in the queue are fetched in the process of returning True.
- Qsize () : Returns the correct number of current items in the queue, which is also unreliable, for the same reason as q.email () and q.null ()
Example:
from multiprocessing import Process, Queue import os, time, random def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue... ' % value) q.put(value) time.sleep(random.random()) def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__ == "__main__": q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, Args =(q,)) pw.start() pr.start() pw.join() # Wait for pw to terminate PR.terminate () #Copy the code
A JoinableQueue is like a Queue object, but a Queue allows the consumer of an item to notify the producer that the item has been successfully processed. Notification processes are implemented using shared signals and condition variables.
Constructor: JoinableQueue([maxsize])
- Maxsize: indicates the maximum number of items allowed in a queue. If omitted, there is no size limit.
Instance methods
Instance P of JoinableQueue has the same methods as Queue objects:
- Task_done () : The consumer uses this method to signal that the return item of q.et () has been processed. If this method is called more times than the number of items removed from the queue, ValueError is raised
- Join (): The producer calls this method to block until all items in the queue have been processed. Blocking continues until the q.ask_done () method is called for each item in the queue
Example:
# -*- coding:utf-8 -*- from multiprocessing import Process, JoinableQueue import time, random def consumer(q): While True: res = q.producer () print() def producer(seq, q): for item in seq: Time. Sleep (random.randrange(1,2)) q.puff (item) print(' producer done %s' % item) q.jin () if __name__ == "__main__": Q = JoinableQueue() seq = (' product %s' % I for I in range(5)) p = Process(target=consumer, Args =(q,)) p.daemon = True # set to daemon, p stops when the main thread stops, but don't worry, P.start () producer(seq, q) print(' main thread ')Copy the code
Alue, Array (for process communication, resource sharing)
Both values and arrays in multiprocessing are implemented by creating ctypes() objects in shared memory to share data. They are implemented in much the same way, but with different ctypes.
Value
Constructor: Value((typecode_or_type, args[, lock])
- Typecode_or_type: Defines the Type of the ctypes() object. Type code or C Type can be passed in the following table.
- Args: Parameter passed to the typecode_or_type constructor
- Lock: Default to True, creates a mutex to restrict access to the Value object, and if passed a lock, such as an instance of lock or RLock, will be used for synchronization. If False is passed, the instance of Value will not be locked and will not be process-safe.
Typecode_or_type Supported types:
| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |Copy the code
Reference address: docs.python.org/3/library/a…
Array
Constructor: Array(typecode_or_type, size_or_initializer, ** KWDS [, lock])
- Typecode_or_type: same as above
- Size_or_initializer: If it is an integer, it determines the length of the array and the array will be initialized to zero. Otherwise, size_or_initializer is the sequence used to initialize the array, whose length determines the length of the array.
- KWDS: Parameter passed to the typecode_or_type constructor
- Lock: same as above
Example:
Import multiprocessing def f(n, a): n.value = 3.14a [0] = 5 if __name__ == '__main__': Num = multiprocessing.Value('d', 0.0) arr = multiprocessing.Array(' I ', range(10)) p = multiprocessing.Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])Copy the code
Note: Value and Array only apply to the Process class.
Pipe (for Pipe communication)
There is another way for multiple processes to transmit data called pipes, which works in the same way as queues. Pipe creates a Pipe between processes and returns a tuple (conn1,conn2), where conn1,conn2 represent connection objects at both ends of the Pipe, emphasizing that the Pipe must be generated before the Process object is generated.
Constructor: Pipe([duplex])
- Dumplex: The default pipe is full duplex. If duplex is False, conn1 can only be used for receiving and conn2 can only be used for sending.
Instance methods:
- Send (obj) : Sends objects over a connection. Obj is any object compatible with serialization
- Recv () : receives the object sent by conn2.send(obj). The recV method blocks until there is no message to receive. If the other end of the connection is closed, the recv method throws an EOFError.
- Close (): closes the connection. This method is automatically called if conn1 is garbage collected
- Fileno (): Returns the integer file descriptor used by the connection
- Poll ([timeout]): Returns True if the data on the connection is available. Timeout Specifies the maximum time to wait. If this parameter is omitted, the method returns the result immediately. If timeout is shot to None, the operation waits indefinitely for data to arrive.
- Recv_bytes ([maxlength]): Receives a complete byte message sent by the c.send_bytes() method. Maxlength Specifies the maximum number of bytes to receive. If incoming messages exceed this maximum value, IOError is raised and no further reading is possible on the connection. EOFError is raised if the other end of the connection is closed and there is no more data.
- Send_bytes (buffer [, offset [, size]]) : Send a buffer of byte data over the connection. Buffer is any object that supports the buffer interface, offset is the byte offset in the buffer, and size is the number of bytes to be sent. The resulting data is sent as a single message, which is then received by calling the c.recv_bytes() function
- Recv_bytes_into (buffer [, offset]): Receives a complete byte message and stores it in a buffer object that supports a writable buffer interface (that is, a bytearray object or similar). Offset Specifies the byte shift in the buffer where the message is placed. The return value is the number of bytes received. If the message length is larger than the available buffer space, a BufferTooShort exception is raised.
Example:
From multiprocessing import Process, Pipe import time def f(Subconn): Print (" from dad :", subconn.recv ()) subconn.close () if __name__ == "__main__": Parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) Parent_conn parent_conn. Recv ()). The send (" um ")Copy the code
Manager (for resource sharing)
The Manager object returned by Manager() controls a Server process containing Python objects that can be accessed by other processes via proxies. Thus achieve multi-process data communication and security. The Manager module is often used with the Pool module.
Support Manager of type list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, the Value and the Array.
Managers are standalone child processes in which real objects exist and run as servers, and other processes access shared objects by using proxies that run as clients. Manager(), a subclass of BaseManager, returns a started instance of SyncManager() that can be used to create shared objects and return proxies that access those shared objects.
BaseManager, the base class that creates the manager server
BaseManager([address[, authKey]])
- Address :(hostname,port) specifies the url address of the server. By default, an idle port is simply allocated
- Authkey: Authentication for the client connecting to the server, default to current_process(). Authkey value
Instance methods:
- Start ([Initializer [, initargs]]) : Starts a separate child process and starts the manager server in that child process
- Get_server () : Gets the server object
- Connect () : Connection manager object
- Shutdown () : Closes the manager object and can only be called after the start() method has been called
Instance attributes:
- Address: Read-only property, the address being used by the manager server
SyncManager, the following types are not process-safe and need to be locked..
Instance methods:
- Array(self,*args,**kwds)
- BoundedSemaphore(self,*args,**kwds)
- Condition(self,*args,**kwds)
- Event(self,*args,**kwds)
- JoinableQueue(self,*args,**kwds)
- Lock(self,*args,**kwds)
- Namespace(self,*args,**kwds)
- Pool(self,*args,**kwds)
- Queue(self,*args,**kwds)
- RLock(self,*args,**kwds)
- Semaphore(self,*args,**kwds)
- Value(self,*args,**kwds)
- dict(self,*args,**kwds)
- list(self,*args,**kwds)
Example:
import multiprocessing def f(x, arr, l, d, n): X.v alue arr = 3.14 [0] = 5 L.A. ppend (' Hello ') d [1] = 2 n.a. = 10 if __name__ = = "__main__ ': Server = multiprocessing.manager () x = server.value ('d', 0.0) arr = server.array (' I ', range(10)) l = server.list() d = server.dict() n = server.Namespace() proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n)) proc.start() proc.join() print(x.value) print(arr) print(l) print(d) print(n)Copy the code
Simultaneous process module
Lock (mutex)
Lock The Lock prevents access conflicts when multiple processes need to access shared resources. Locking ensures that when multiple processes modify the same block of data, only one modification can be made at a time, namely serial modification, sacrificing speed but ensuring data security. Lock contains two states — locked and unlocked — and two basic methods.
Constructor: Lock()
Instance methods:
- Acquire ([timeout]): Causes the thread to enter a synchronous blocking state, attempting to acquire the lock.
- Release (): Releases the lock. The thread must have acquired the lock before use, otherwise an exception will be thrown.
Example:
from multiprocessing import Process, Lock def l(lock, num): lock.acquire() print("Hello Num: %s" % (num)) lock.release() if __name__ == '__main__': lock = lock () # Process(target=l, args=(lock, num)).start()Copy the code
RLock (reentrant mutex that can be acquired multiple times by the same process without blocking)
RLock (reentrant lock) is a synchronization instruction that can be requested multiple times by the same thread. RLock uses the concept of “owned threads” and “recursive levels”, and is owned by a thread while locked. A thread with an RLock can call acquire() again and release() the same number of times to release the lock. You can think of RLock as containing a lock pool and a counter with an initial value of 0. Each successful call to acquire()/release(), the counter will be +1/-1 and the lock will be unlocked at 0.
Constructor: RLock()
Instance methods:
- Acquire ([timeout]) : with the Lock
- Release () : with the Lock
Semaphore
Semaphores are a more advanced locking mechanism. A semaphore has a counter inside it, unlike a lock object that has a lock identifier inside it, and a thread blocks only if the number of threads occupying the semaphore exceeds the number. This allows multiple threads to access the same code area simultaneously. For example, if the toilet has three pits, only three people can go to the toilet, and the people behind can only wait for someone to come out of the toilet. If the specified semaphore is 3, then one person will get a lock, and the count will be increased by 1. When the count is 3, everyone behind will have to wait. Once released, someone can get a lock.
Semaphore([value])
- Value: Sets the semaphore. The default value is 1
Instance methods:
- Acquire ([timeout]) : with the Lock
- Release () : with the Lock
Example:
from multiprocessing import Process, Semaphore import time, random def go_wc(sem, user): Sem.acquire () print('%s') time.sleep(random.randint(0, 3)) sem.release() print(user, 'OK') if __name__ == '__main__': sem = Semaphore(2) p_l = [] for i in range(5): p = Process(target=go_wc, args=(sem, 'user%s' % i,)) p.start() p_l.append(p) for i in p_l: i.join()Copy the code
Condition (Condition variable)
Condition can be thought of as an advanced Lock that provides more advanced functionality than Lock or RLock and allows us to control complex thread synchronization issues. Condition maintains a lock object internally (RLock by default) and can pass in the lock object as a parameter when creating a Condigtion object. Condition also provides acquire, release methods, which have the same meaning as lock acquire, release methods. In fact, it simply calls the corresponding methods of the internal lock object. Condition provides other methods as well.
Condition([lock/rlock])
- You can pass an instance of Lock/RLock to the constructor, otherwise it will generate an instance of RLock itself.
Instance methods:
- Acquire ([timeout]) : First acquire, then judge some conditions. If the condition is not met, wait
- Release () : Releases the Lock
- Wait ([timeout]): Calling this method causes the thread to enter the Condition’s wait pool for notification and release the lock. The thread must have acquired the lock before use, otherwise an exception will be thrown. A thread in wait state rejudges the condition after being notified.
- Notify (): Calling this method picks a thread from the wait pool and notifies it. The thread receiving the notification will automatically call Acquire () to try to acquire the lock (into the lock pool); Other threads are still in the wait pool. Calling this method does not release the lock. The thread must have acquired the lock before use, otherwise an exception will be thrown.
- NotifyAll (): Calling this method notifies all the threads in the wait pool, and they all enter the lock pool to try to obtain the lock. Calling this method does not release the lock. The thread must have acquired the lock before use, otherwise an exception will be thrown.
Example:
import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()Copy the code
An Event
The Event contains a flag bit, which is initially false. You can use set() to set it to true; Or use clear() to reset it to false; You can use is_set() to check the status of the flag bit; The other most important function is wait(timeout=None), which blocks the current thread until the event’s internal flag bit is set to true or timeout times out. Wait () returns if the internal flag bit is true.
Example:
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')Copy the code
Other content
Dummy is a multithreaded module, while multiprocessing is a multiprocess module. Dummy apis are common. So it’s easy to switch code between multiple threads and multiple processes. Multiprocessing. dummy is usually tried in AN I/O scenario, for example by introducing a thread pool as follows.
from multiprocessing.dummy import Pool as ThreadPoolCopy the code
Dummy: Dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy: dummy.
Reference Documents:
- Docs.python.org/3/library/m…
- www.rddoc.com/doc/Python/…
Concurrent. futures in Python
The Python standard library provides the threading and Multiprocessing modules for writing multithreaded/multiprocess code. Starting with Python3.2, the library provides us with the concurrent.futures module, which provides the ThreadPoolExecutor and ProcessPoolExecutor classes, Implements higher levels of abstraction from threading and multiprocessing and provides direct support for writing thread pools/process pools. Concurrent. futures Base modules are executor and future.
Executor
Executor is an abstract class that cannot be used directly. It defines some basic methods for specific asynchronous execution. ThreadPoolExecutor and ProcessPoolExecutor inherit Executor and are used to create code for thread pools and process pools, respectively.
ThreadPoolExecutor object
The ThreadPoolExecutor class is an Executor subclass that uses a thread pool to make asynchronous calls.
class concurrent.futures.ThreadPoolExecutor(max_workers)Copy the code
Asynchronous calls are made using a thread pool of max_workers number.
ProcessPoolExecutor object
The ThreadPoolExecutor class is an Executor subclass that uses a pool of processes to make asynchronous calls.
class concurrent.futures.ProcessPoolExecutor(max_workers=None)Copy the code
Asynchronous calls are made using a process pool with the number of max_workers, or the number of processors on the machine if max_workers is None (for example, on a 4-core machine with max_worker set to None, four processes are used for asynchronous concurrency).
Submit () method
Executor defines the submit() method, which submits an executable callback task and returns a Future instance. The Future object represents the given call.
Executor.submit(fn, *args, **kwargs)
- Fn: function that needs to be executed asynchronously
- *args, **kwargs: fn parameter
Example:
from concurrent import futures
def test(num):
import time
return time.ctime(), num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 1)
print(future.result())Copy the code
The map () method
In addition to Submit, Exectuor also provides us with a map method that returns a map(func, *iterables) iterator in which the callback execution returns an ordered result.
Executor.map(func, *iterables, timeout=None)
- Func: functions that need to be executed asynchronously
- * Iterables: objects that can be iterated over, such as lists. Each func execution takes arguments from iterables.
- Timeout: Set the timeout period for each asynchronous operation. The value of timeout can be an int or float. If the operation times out, raisesTimeoutError is returned. If the timeout parameter is not specified, the timeout time is not specified.
Example:
from concurrent import futures
def test(num):
import time
return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)Copy the code
Shutdown () method
Free system resources, called after an asynchronous operation such as executor.submit () or executor.map (). Using the with statement avoids calling this method explicitly.
Executor.shutdown(wait=True)
Future
A Future can be understood as an operation to be completed in the Future, which is the foundation of asynchronous programming. Typically, we perform IO operations, access urls (below) will block until we wait for the results to return, and the CPU can’t do anything else. The Future is introduced to help us do other things while we wait.
The Future class encapsulates callable asynchronous execution. The Future instance is created through the executor.submit () method.
- Cancel () : Attempts to cancel the call. If the call is currently executing and cannot be cancelled, the method returns False, otherwise the call is cancelled and the method returns True.
- Cancelled () : Returns True if the call is cancelled successfully.
- Running () : Returns True if the call is currently executing and cannot be cancelled.
- Done () : Returns True if the call was successfully cancelled or terminated.
- Result (timeout=None) : Returns the value returned by the call. If the call has not completed, the method will wait timeout seconds. If the call does not complete within timeout seconds, a Futures.TimeoutError will be reported. Timeout can be an integer or a floating-point value. If timeout is not specified or None, the wait time is infinite. If futures is cancelled before completion, CancelledError will be raised.
- Exception (timeout=None) : Returns an exception thrown by the call. If the call has not yet completed, the method waits for the time specified by timeout, and if the call has not completed after that time, the timeout error futures.TimeoutError is reported. Timeout can be an integer or a floating-point value. If timeout is not specified or None, the wait time is infinite. If futures is cancelled before completion, CancelledError will be raised. If the call completes and no exceptions are reported, None is returned.
- Add_done_callback (fn) : Binds callable FN to the future. When the future is cancelled or finished running, fn will be called as the only argument to the future. If the future has been run or cancelled, fn will be called immediately.
- wait(fs, timeout=None, return_when=ALL_COMPLETED)
- Wait for the Future instance (Possibly created by different Executor Instances) provided by FS to finish running. Returns a named 2-element collection, with sub-tables representing completed and unfinished
- Return_when indicates when the function should return. Its value must be one of the following:
- FIRST_COMPLETED: The function returns when any future is completed or cancelled.
- FIRST_EXCEPTION: This function returns when any future ends because of an exception. If no future has an error, the effect is equal to
- ALL_COMPLETED: The function does not return until all futures are completed.
- As_completed (fs, timeout=None) : The argument is a list of Future instances and the return value is an iterator that produces the Future instance at the end of the run.
Example:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
print(x.result())
print(2)Copy the code
Reference links:
- pythonhosted.org/futures/
- www.rddoc.com/doc/Python/…
- Hellowac. Making. IO/programing %…
Next period: coroutine, please look forward to.