Process synchronization (locking) Processes do not share data, but share the same file system, so accessing the same file, or printing terminal, is no problem.
And sharing brings competition, the result of competition is disorder, how to control, is to add lock processing
Part1: Multiple processes run concurrently on the same printing terminal, which is efficient. However, they compete on the same printing terminal, resulting in printing disorder# Concurrent operation, high efficiency, but competing with the same printing terminal, bringing printing disorder
from multiprocessing import Process
import os,time
def work():
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
if __name__ == '__main__':
for i in range(3):
p=Process(target=work)
p.start()
Copy the code
Locking: from concurrent to serial, sacrificing efficiency but avoiding competition
# Change from concurrent to serial, sacrificing operational efficiency but avoiding competition
from multiprocessing import Process,Lock
import os,time
def work(lock):
lock.acquire()
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
lock.release()
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=work,args=(lock,))
p.start()
Copy the code
Locking can ensure that when multiple processes modify the same block of data, only one task can be modified at the same time, that is, serial modification. Yes, the speed is slow, but the speed is sacrificed to ensure data security.
While it is possible to use file sharing data for interprocess communication, the problem is:
1. Low efficiency (Shared data is based on files, and files are data on hard disks)
2. You need to lock it yourself
Therefore, it is best to find a solution that is both efficient (multiple processes sharing the same memory) and helps us deal with the locking problem. This is the message-based IPC communication mechanism that the Mutiprocessing module provides: queues and pipes.
Both queues and pipes store data in memory
Queues, in turn, are based on (pipes + locks), freeing us from complex locking problems,
We should avoid sharing data as much as possible, use messaging and queues as much as possible, avoid dealing with complex synchronization and locking issues, and often get better scalability as the number of processes increases.
Queues (recommended) Processes are isolated from each other. To implement interprocess communication (IPC), the Multiprocessing module supports two forms: queues and pipes, both of which use messaging
Classes that create queues (the underlying implementation is pipes and locks) :
Queue([maxsize]): Create a shared process Queue. Queue is a multi-process secure Queue. You can use Queue to transfer data between multiple processes.
Maxsize is the maximum number of items allowed in the queue. If omitted, there is no size limit.
Main methods:
The q.put method is used to insert data into the queue, and the PUT method has two optional arguments: blocked and timeout. If Blocked is True (the default) and timeout is positive, this method blocks for the time specified by timeout until the queue has space left. If a timeout occurs, Queue.Full is raised. If Blocked is False, but the Queue is Full, the queue.full exception is immediately raised. The q.et method reads and deletes an element from a queue. Again, the get method has two optional arguments: blocked and timeout. If Blocked is True (the default) and timeout is positive, no elements are fetched during the wait time, and queue.Empty is raised. If Blocked is False, two cases exist: if the Queue has a value available, it returns that value immediately; otherwise, if the Queue is empty, Q.get_nowait (): the same as q.get_nowait(): the same as q.put_nowait(): the same as q.put_nowait(): the same as q.put_nowait(): the same as q.put_nowait(): the same as q.put_nowait(): the same as q.put_nowait(): the same as q.put_nowait(): the same as q.put_nowait(): the same as q.put_nowait(). For example, in the process of returning True, if an item is added to the queue. Q.null () : Returns True if q is full when this method is called. This result is unreliable, such as if items in the queue are fetched in the process of returning True. Q.q size(): Returns the correct number of items currently in the queue. The result is also unreliable, for the same reason as q.email () and q.null ()Copy the code
Application:
' ''The Multiprocessing module supports two main forms of interprocess communication: pipes and queues are both implemented based on messaging, but queue interfaces'' '
from multiprocessing import Process,Queue
import time
q=Queue(3)
#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) # is full
print(q.get())
print(q.get())
print(q.get())
print(q.empty()) # is empty
Copy the code
Producer-consumer model
Using the producer and consumer pattern in concurrent programming solves most concurrency problems. This mode improves the overall data processing speed of the program by balancing the working capacity of the production thread and the consumption thread.
Why use producer and consumer modelsCopy the code
In the threaded world, producers are the threads that produce the data, and consumers are the threads that consume the data. In multithreaded development, if the producer is fast and the consumer is slow, the producer must wait for the consumer to finish processing before continuing to produce data. In the same way, if the processing power of the consumer is greater than that of the producer, then the consumer must wait for the producer. To solve this problem the producer and consumer model was introduced.
What is the producer-consumer modelCopy the code
The producer-consumer model solves the strong coupling problem of producer and consumer through a container. No direct communication between producers and consumers to each other, and through the blocking queue to communicate, so after producers to produce the data need not wait for the consumer process, directly to the blocking queue, consumers are looking for producers to data, but directly from the blocking queue, blocking queue is equivalent to a buffer, balance the processing capacity of producers and consumers.
Implement producer-consumer model based on queue
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
whileTrue: res = q.g et () time. Sleep (random. The randint (1, 3))print('033 [45 m % s \ % s \ [0 033 m' %(os.getpid(),res))
def producer(q):
for i inRange (10) : time. Sleep (random. The randint (1, 3)) res ='the steamed stuffed bun % s' %i
q.put(res)
print('\033[44m%s produces %s\033[0m' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
# Producers: chefs
p1=Process(target=producer,args=(q,))
# Consumers: Foodies
c1=Process(target=consumer,args=(q,))
# start
p1.start()
c1.start()
print('the Lord')
Copy the code
Summary of the producer-consumer model
There are two roles in the program: one is responsible for producing data (producers) and the other is responsible for processing data (consumers). The producer-consumer model is introduced to solve the following problems: How to balance the working ability between producers and consumers, so as to improve the overall processing speed of the program: Producer <--> queue <--> consumer The producer consumer model implements the decoupling sum of class programsCopy the code
The problem is that the main process never ends, because producer P ends after production, but consumer C is stuck in an infinite loop at q.et () after empting q.
The solution is to have the producer send an end signal to the queue after production, so that the consumer can break out of the loop after receiving the end signal
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break End upon receipt of end signalTime. Sleep (random. Randint (1, 3))print('033 [45 m % s \ % s \ [0 033 m' %(os.getpid(),res))
def producer(q):
for i inRange (10) : time. Sleep (random. The randint (1, 3)) res ='the steamed stuffed bun % s' %i
q.put(res)
print('\033[44m%s produces %s\033[0m' %(os.getpid(),res))
q.put(None) Send the end signal
if __name__ == '__main__':
q=Queue()
# Producers: chefs
p1=Process(target=producer,args=(q,))
# Consumers: Foodies
c1=Process(target=consumer,args=(q,))
# start
p1.start()
c1.start()
print('the Lord')
Copy the code
The idea is simply to send an end signal, and there is another queue that provides this mechanism
JoinableQueue([maxsize]) : This is like a Queue object, but the Queue allows the consumer of the item to notify the producer that the item has been successfully processed. Notification processes are implemented using shared signals and condition variables.
Parameter Description:
Maxsize is the maximum number of items allowed in the queue. If omitted, there is no size limit. In addition to the same methods as the Queue object, JoinableQueue p has: q.task_done() : the user uses this method to signal that the return item of q.et () has been processed. If this method is called more times than the number of items removed from the queue, ValueError is raised q.jin (): The producer calls this method to block until all items in the queue have been processed. Blocking continues until the q.ask_done () method is called for each item in the queueCopy the code
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
whileTrue: res = q.g et () time. Sleep (random. The randint (1, 3))print('033 [45 m % s \ % s \ [0 033 m' %(os.getpid(),res))
q.task_done() Send a signal to q.jin () indicating that a piece of data has been taken
def producer(name,q):
for i inRange (10) : time. Sleep (random. The randint (1, 3)) res ='%s%s' %(name,i)
q.put(res)
print('\033[44m%s produces %s\033[0m' %(os.getpid(),res))
q.join()
if __name__ == '__main__':
q=JoinableQueue()
# Producers: chefs
p1=Process(target=producer,args=('the steamed stuffed bun',q))
p2=Process(target=producer,args=('bone',q))
p3=Process(target=producer,args=('swill',q))
# Consumers: Foodies
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
c1.daemon=True
c2.daemon=True
# start
p_l=[p1,p2,p3,c1,c2]
for p in p_l:
p.start()
p1.join()
p2.join()
p3.join()
print('the Lord')
# main process, etc. -->p1,p2,p3, etc. ---->c1,c2
C1, C2 must have received all data from p1, P2,p3
C1,c2 have no value and should end with the main process, so make it a daemon
Copy the code
Pipeline Interprocess Communication (IPC) mode 2: pipeline (not recommended, just understand)
introduce
Create pipe class:Pipe([Duplex]): Creates a Pipe between processes and returns a tuple (conn1,conn2) where conn1,conn2 represent connection objects at both ends of the Pipe. It is important to note that the Pipe must be generated before the Process object# Parameter description:Dumplex: The default pipe is full duplex. If duplex is False, conn1 can only be used for receiving and conn2 can only be used for sending.# Main methods:Conn1.recv (): receives the object sent by conn2.send(obj). The recV method blocks until there is no message to receive. If the other end of the connection is closed, the recv method throws an EOFError. Conn1. send(obj): Sends objects over a connection. Obj is any object compatible with serialization# Other methods:Conn1. close(): closes the connection. This method is automatically called if conn1 is garbage collected: conn1.fileno(): returns the integer file descriptor used by the connection conn1.poll([timeout]): returns True if data is available on the connection. Timeout Specifies the maximum time to wait. If this parameter is omitted, the method returns the result immediately. If timeout is shot to None, the operation waits indefinitely for data to arrive. Conn1.recv_bytes ([maxlength]): Receives a complete byte message sent by the c.send_bytes() method. Maxlength Specifies the maximum number of bytes to receive. If incoming messages exceed this maximum value, IOError is raised and no further reading is possible on the connection. EOFError is raised if the other end of the connection is closed and there is no more data. Conn.send_bytes (buffer [, offset [, size]]) : Send a buffer of byte data over a connection. Buffer is any object that supports the buffer interface, offset is the byte offset in the buffer, and size is the number of bytes to be sent. The resulting data is sent as a single message, which is then received by calling the c.recv_bytes() function conn1.recv_bytes_into(buffer [, offset]): Receives a complete byte message and stores it in a buffer object This object supports a writable buffer interface (that is, a Bytearray object or similar). Offset Specifies the byte shift in the buffer where the message is placed. The return value is the number of bytes received. If the message length is larger than the available buffer space, a BufferTooShort exception is raised.Copy the code
Inter-process communication based on pipes (similar to queues, which are pipe locked)
from multiprocessing import Process,Pipe
import time,os
def consumer(p,name):
left,right=p
left.close()
while True:
try:
baozi=right.recv()
print('%s received bun :%s' %(name,baozi))
except EOFError:
right.close()
break
def producer(seq,p):
left,right=p
right.close()
for i in seq:
left.send(i)
# time.sleep(1)
else:
left.close()
if __name__ == '__main__':
left,right=Pipe()
c1=Process(target=consumer,args=((left,right),'c1'))
c1.start()
seq=(i for i in range(10))
producer(seq,(left,right))
right.close()
left.close()
c1.join()
print('Main process')
Copy the code
Inter-process communication based on pipes (similar to queues, which are pipe locked)
Process pooling Can save a lot of time when using Python for system administration, especially when operating multiple file directories at the same time or remotely controlling multiple hosts. Multi-process is one of the means to achieve concurrency. The following problems need to be paid attention to:
It is obvious that the number of concurrent tasks is often much larger than the number of cores
An operating system cannot open processes indefinitely, usually with as many cores as possible
If too many processes are started, the efficiency will decrease (starting a process takes up system resources, and a process with an excess number of cores cannot be parallel).
For example, if the number of objects being manipulated is not large, you can use the Process in Multiprocessing to create multiple processes. Manually limiting the number of processes is too tedious, so you can use the process pool.
We can control the number of processes by maintaining a process pool, such as HTTPD’s process mode, specifying the minimum and maximum number of processes…
Class for creating a process pool: If numProcess is specified as 3, the process pool creates three processes from scratch and then uses these three processes to perform all tasks from start to finish. No other processes are started
Main methods:
P.apply (func [, args [, kwargs]]): Executes func(*args,**kwargs) in a pool worker process and returns the result. It is important to note that this operation does not execute func functions in all pool worker processes. If you want to execute func functions concurrently with different arguments, The p.apply() function must be called from a different thread or p.apply_async() p.apply_async(func [, args [, kwargs]]) must be used: func(*args,**kwargs) is executed in a pool worker process and the result is returned. The result of this method is an instance of the AsyncResult class, and the callback is a callable object that receives input parameters. When the func result becomes available, the understanding is passed to the callback. Callback disallows any blocking operations that would otherwise receive results from other asynchronous operations. P.close (): Closes the process pool to prevent further operations. If all operations continue to hang, they will complete before the worker process terminates p.ion (): waits for all worker processes to exit. This method can only be called after close () or teminate() from multiprocessing import Pool import OS,time def work(n):print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) # create three processes from scratch in the process pool, and these three processes will always perform tasks
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) The task may or may not be blocked, but whether the task is blocked or not, the synchronous call will wait in place, but if the task is blocked during the waiting process, the CPU will be deprived of execution permission
res_l.append(res)
print(res_l)
Copy the code
Identify the QR code in the picture. Welcome to python