1 Basic concepts of processes
What is a process
A process is a dynamic execution of a program on a data set. The process is generally composed of three parts: program, data set and process control block. We write programs that describe what a process does and how it does it; Data sets are the resources that the program needs to use in the execution process. Process control block is used to record the external characteristics of the process, describe the process of execution changes, the system can use it to control and manage the process, it is the only symbol of the system aware of the existence of the process.
2 Parent process and child process
The Linux operating system provides a fork() function to create child processes. This function is special and calls once and returns twice because the operating system makes a copy of the current process (parent process) and returns it in both parent and child processes. The child always returns 0, and the parent returns the PID of the child. We can determine whether we are executing in the parent or child process by determining whether the return value is 0. The fork() function is also provided in Python, which is located under the OS module.
import os
import time
print("Before creating child process: pid=%s, ppID =%s" % (os.getpid(), os.getppid()))
pid = os.fork()
if pid == 0:
print("Child process information: PID =%s, ppID =%s" % (os.getpid(), os.getppid()))
time.sleep(5)
else:
print("Parent information: PID =%s, ppID =%s" % (os.getpid(), os.getppid()))
time.sleep(5)
# The following is printed twice, once in the parent process and once in the child process.
The value returned from the parent process is the pid of the created child process, greater than 0
print("fork: pid=%s, ppid=%s"% (os.getpid(), os.getppId ())) result: Before creating a child process: PID =3792, ppID =2285 Parent process: PID =3792, ppID =2285 Child process: pid=3793, ppid=3792 fork:pid=3793,ppid=3792 fork:pid=3792,ppid=2285Copy the code
2.1 How can Parent and Child Processes be distinguished
Fork (pid = os.fork())
Check whether the returned PID value is 0 to determine whether it is a child process. If the value is 0, it indicates that it is a child process
Since fork() is a Linux concept, it is best to use the subprocess module to create child processes if you want to be cross-platform.
2.2 How can A Child Process Be Reclaimed
The os.wait() method is used in Python to reclaim resources occupied by child processes
Pid, result = os.wait(
If there is a child that has not been reclaimed, but the parent has died, the child is a zombie.
3 Python process module
The Multiprocessing module of Python has multiple ways to create processes. Each way and Process resource reclamation are different. The following sections analyze three processes: Process,Pool, and fork.
3.1 the fork ()
import os
pid = os.fork() Create a child process
#os.wait() # Wait for the child process to finish releasing resources
print(PID) The pid is 0. Result: 3874 0Copy the code
Disadvantages:
1. Poor compatibility. It can be used only in Linux-like systems, but not in Windows systems. 2. Poor scalability. Process management becomes complicated when multiple processes are required; 3. Orphan processes and zombie processes are generated, and resources need to be manually reclaimed.Copy the code
Advantages:
The system is close to the creation of low level, high efficiency.Copy the code
3.2 the process process
The multiprocessing module provides the Process class to create a new Process import OS from multiprocessing import Process import time def fun(name):print(2 Child process information: PID =%s, ppID =%s % (os.getpid(), os.getppid()))
print("hello " + name)
def test() :print('ssss')
if __name__ == "__main__":
print("1 Main process info: PID =%s, ppID =%s" % (os.getpid(), os.getppid()))
ps = Process(target=fun, args=('jingsanpang')),print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
print(3 Process info: PID =%s, ppID =%s % (os.getpid(), os.getppid()))
print(ps.is_alive())
ps.start()
print(ps.is_alive())
print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
print(4 Process info: pid=%s, ppID =%s % (os.getpid(), os.getppid()))
ps.join()
print(ps.is_alive())
print(5 Process info: pid=%s, ppID =%s % (os.getpid(), os.getppid()))
ps.terminate()
print(6 Process info: pid=%s, ppID =%s % (os.getpid(), os.getppid()))
Copy the code
Features:
1. Note that the Process object can create a Process, but the Process object is not a Process. Whether it is deleted or not is not directly related to whether system resources are reclaimed.
2. By default, the main process reclaims resources after the sub-process finishes. You do not need to manually reclaim resources. The join() function is used to control the order in which child processes end. There is also an internal function to clean up zombie processes and reclaim resources.
3. When the Process is created, the child Process makes a complete copy of the Process object of the main Process, so that there is a Process object in the main Process and a Process object in the child Process, but the Process object in the main Process exists as a static object and is not executed.
4. When the child process finishes executing, a zombie process will be generated, which will be reclaimed by the JOIN function. Or if another process is started, the start function will also reclaim the zombie process, so it is not necessary to write the JOIN function.
5. In Windows, the Process object of the subprocess is automatically cleared immediately after the subprocess ends. In Linux, if the join and start functions are not used, the Process object of the subprocess is automatically cleared after the main Process ends.
Alternatively, you can override the run method to create a Process by inheriting the Process object
3.3 Process POOL POOL (Multiple Processes)
import multiprocessing
import time
def work(msg):
mult_proces_name = multiprocessing.current_process().name
print('process: ' + mult_proces_name + The '-' + msg)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=5) Create 4 processes
for i in range(20):
msg = "process %d" %(i)
pool.apply_async(work, (msg, ))
pool.close() # Close the process pool, indicating that processes cannot be added to the process pool
pool.join() Wait for all processes in the pool to finish executing, must be called after close()
print("Sub-process all done.")
Copy the code
Apply_async () is a variation of apply(). Apply_async () is a parallel version of apply(). Apply () is a blocking version of apply_async(). So it’s a blocked version. Apply () is both a Pool method and a built-in Python function, and the two are equivalent. You can see that the output is not printed in the order in the code’s for loop.
Multiple child processes and return a value
Apply_async () itself returns the return value of the function called by the process. In the last code that created multiple child processes, if a value was returned in func, pool.apply_async(func, (MSG,)) would result in an object that returns the values of all the processes in the pool.
import multiprocessing
import time
def func(msg):
return multiprocessing.current_process().name + The '-' + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4) Create 4 processes
results = []
for i in range(20):
msg = "process %d" %(i)
results.append(pool.apply_async(func, (msg, )))
pool.close() # close the process pool, indicating that no more processes can be added to the process pool and need to be called before joining
pool.join() Wait for all processes in the pool to complete
print ("Sub-process(es) done.")
for res in results:
print (res.get())
Copy the code
Unlike the previous output, this output is ordered. If the computer has eight cores, set up eight processes, type the top command in Ubuntu and press 1 on the keyboard, and you can see that each CPU usage is fairly even
4 Interprocess communication mode
- Pipe: A half-duplex communication mode in which data flows only one way and can only be used between related processes. Process kinship usually refers to the parent-child process relationship.
- Named pipe FIFO: A named pipe is also a half-duplex communication mode, but it allows communication between unrelated processes.
- MessageQueue: a MessageQueue is a linked list of messages, stored in the kernel and identified by MessageQueue identifiers. The message queue overcomes the disadvantages of little signal transmission, pipe carrying only plain byte stream and limited buffer size.
- SharedMemory: SharedMemory maps a segment of memory that can be accessed by other processes. This segment of SharedMemory is created by one process but can be accessed by multiple processes. Shared memory is the fastest IPC method and is specifically designed for the low efficiency of other interprocess communication methods. It is often used in conjunction with other communication mechanisms, such as signal two, to achieve synchronization and communication between processes.
Among the above communication methods, message queue is the most frequently used one.
(1) Pipe **
import multiprocessing
def foo(sk):
sk.send('hello father')
print(sk.recv())
if __name__ == '__main__':
conn1,conn2=multiprocessing.Pipe() # open two ports, both in and out, if False in parentheses, one-way communication
p=multiprocessing.Process(target=foo,args=(conn1,)) The child process uses sock port to call foo
p.start()
print(conn2.recv()) The main process uses the CONN port to receive messages
conn2.send('hi son') The main process is sent using the CONN interfaceResult: Hello father hi sonCopy the code
A Queue is a secure Queue for multiple processes. It can be used to transfer data between multiple processes.
Some common methods of Queue:
- Queue.qsize() : Returns the number of messages in the current Queue;
- Queue.empty() : Returns True if the Queue is empty, False otherwise;
- Queue.full() : Returns True if the Queue is full, False otherwise;
- Queue.get(): Gets a message from a Queue and then removes it from the Queue.
- Queue.get_nowait() : equivalent to queue.get (False), raises an exception if no value is fetched: Empty;
- Queue.put(): Adds a value to an array.
- Queue.put_nowait(): equivalent to queue.get (False), Full when the Queue is Full. Example: From multiprocessing import Process, Queue import time
def write(q):
for i in ['A'.'B'.'C'.'D'.'E'] :print('Put %s to queue' % i)
q.put(i)
time.sleep(0.5)
def read(q):
while True:
v = q.get(True)
print('get %s from queue' % v)
if __name__ == '__main__':
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
print('write process = ', pw)
print('read process = ', pr)
pw.start()
pr.start()
pw.join()
pr.join()
pr.terminate()
pw.terminate()
Copy the code
Queue and pipe only implement data interaction, not data sharing, where one process changes another’s data.
Note: Communication between processes should avoid using shared data
Multi-process implementation of producer consumers
The following is the realization of producer and consumer patterns through multiple processes
import multiprocessing
from multiprocessing import Process
from time import sleep
import time
class MultiProcessProducer(multiprocessing.Process):
def __init__(self, num, queue):
"""Constructor"""
multiprocessing.Process.__init__(self)
self.num = num
self.queue = queue
def run(self):
t1 = time.time()
print('producer start ' + str(self.num))
for i in range(1000):
self.queue.put((i, self.num))
# print 'producer put', i, self.num
t2 = time.time()
print('producer exit ' + str(self.num))
use_time = str(t2 - t1)
print('producer ' + str(self.num) + ', use_time: '+ use_time)
class MultiProcessConsumer(multiprocessing.Process):
def __init__(self, num, queue):
"""Constructor"""
multiprocessing.Process.__init__(self)
self.num = num
self.queue = queue
def run(self):
t1 = time.time()
print('consumer start ' + str(self.num))
while True:
d = self.queue.get()
ifd ! = None:# print 'consumer get', d, self.num
continue
else:
break
t2 = time.time()
print('consumer exit ' + str(self.num))
print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1))
def main():
# create queue
queue = multiprocessing.Queue()
# create processes
producer = []
for i in range(5):
producer.append(MultiProcessProducer(i, queue))
consumer = []
for i in range(5):
consumer.append(MultiProcessConsumer(i, queue))
# start processes
for i in range(len(producer)):
producer[i].start()
for i in range(len(consumer)):
consumer[i].start()
# wait for processs to exit
for i in range(len(producer)):
producer[i].join()
for i in range(len(consumer)):
queue.put(None)
for i in range(len(consumer)):
consumer[i].join()
print('all done finish')
if __name__ == "__main__":
main()
Copy the code
6 summarizes
There are two ways to create multiple processes in Python:
(1) fork the child process
(2) Use the multiprocessing library to create a child process
Note that queue.queue is thread-safe, but not process-safe, so multiprocesses usually use thread-safe multiprocessing.queue ().
Pool = multiprocessing.Pool(processes = 3) Pool = multiprocessing.Pool(processes = 3) Pool = apply_async Pool. apply implements blocking mode.
The apply_async and apply functions, the former non-blocking and the latter blocking. You can see that the multiples of the run time difference are the number of process pools.
Append (pool.apply_async(func, (MSG,)))).