Practical article four
Multiple processes
Using multiple processes under Windows, regardlessProcessPoolExecutor
ormultiprocessing
, the code will be placed inif __ name__ == '__ main __'
Module, otherwise an error is reported
from concurrent.futures import ProcessPoolExecutor, as_completed
def fib(b) :
if n <= 2:
return 1
return fib(n-1)+fib(n-2)
if __name__ == '__main__':
with ProcessPoolExecutor(3) as executor:
all_task = [executor.submit(fib,(num)) for num in range(15.45)]
for future in as_completed(all_task):
data = future.result()
print('result: {}'.format(data))
Copy the code
- When dealing with CPU-intensive problems, using multiple processes is faster than using multiple threads, but it’s hard to double that because switching between processes takes more time than switching between threads.
multiprocessing
import time
import multiprocessing
def get_html(n) :
time.sleep(n)
return n
if __name__ == '__main__':
progress = multiprocessing.Process(target=get_html, args=(2,))
progress.start()
progress.join()
print(progress.pid) Print the ID of the process
Copy the code
The process pool of multiprocessing
pool = multiprocessing.Pool(multiprocessing.cpu_count())
result = pool.apply_async(get_html, args=(3)),# Process commit
pool.close() Close should be called before calling.join to close the process pool so that it can no longer accept processes, otherwise the.join method will fail
pool.join()
print(result.get())
Copy the code
-
Common methods:
-
Apply_async (self, func, args=(), KWDS ={}, callback=None, error_callback=None) : Asynchronous version of the apply() method
-
Apply (self, fuc, args=(), KWDS ={}) : Add the process
-
Imap (self, func, iterable, chunksize=1) : corresponds to the map method in the previous executor
pool = multiprocessing.Pool(multiprocessing.cpu_count()) for result in pool.imap(get_html, [1.5.3) :print('{} sleep success'.format(result)) >> 1 sleep success 5 sleep success 3 sleep success Copy the code
-
Imap_unordered (self, func, iterable, chunksize=1) : Executes first, returns first
for result in pool.imap_unordered(get_html, [1.5.3) :print('{} sleep success'.format(result)) >> 1 sleep success 3 sleep success 5 sleep success Copy the code
-
Communication between processes
-
Note that queues in multi-processes are not the same as queues in multi-threads. Here we use queues from maltiProcessing
from multiprocessing import Process,Queue import multiprocessing def producer(queue) : queue.put("a") time.sleep(2) def consumer(queue) : time.sleep(2) data = queue.get() print(data) if __name__ == "__main__": queue = Queue(10) my_producer = Process(target=producer, args=(queue,)) my_consumer = Process(target=consumer, args=(queue,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() Copy the code
-
Note that queues created in maltiprocessing cannot be used in Pool processes
from multiprocessing import Pool,Queue if __name__ == "__main__": queue = Queue(10) pool = Pool(2) pool.apply_async(producer, args=(queue,)) pool.apply_async(consumer, args=(queue,)) pool.close() pool.join() # No result is returned Copy the code
-
Note that communication between processes in the pool needs to use queues from the Manager
from maltiprocessing import Manager if __name__ == "__main__": queue = Manager().Queue(10) pool = Pool(2) pool.apply_async(producer, args=(queue,)) pool.apply_async(consumer, args=(queue,)) pool.close() pool.join() Copy the code
-
Use PIPE for communication
-
Pipe can only be used for communication between two processes
-
Pipe returns an instance of Connection. Common methods for Connection are send(self, obj) and recv(self).
def Pipe(duplex=True) : return Connection(), Connection() class Connection(object) : def send(self, obj) : pass def recv(self) : pass Copy the code
-
Pipes perform better than queues
from maltiprocessing import Pipe def producer(pipe) : pipe.send("bobby") def consumer(pipe) : print(pipe.recv()) if __name__ == "__main__": recevie_pipe, send_pipe=Pipe() # Both ends of the pipe, sender and receiver my_producer= Process(target=producer, args=(send_pipe, )) my_consumer = Process(target=consumer, args=(recevie_pipe,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() Copy the code
-
-
Sharing global variables is not appropriate in multiple processes
def producer(a) : a += 100 time.sleep(2) def consumer(a) : time.sleep(2) print(a) if __name__ == '__main__': a = 1 my_producer = Process(target=producer,args=(a,)) my_consumer = Process(target=consumer,args=(a,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() >> 1 # return 1, not 101 Copy the code
-
Interprocess synchronization (maintaining a common memory/variable) using Manager()
def Manager() : return multiprocessing.SyncManager() class SyncManager(multiprocessing.managers.BaseManager) : def Condition(self, lock=None) : return threading.Condition(lock) def Event(self) : return threading.Event() def Lock(self) : return threading.Lock() def Namespace(self) : pass def Queue(self, maxsize=None) : return queue.Queue() def RLock(self) : return threading.RLock() def Semaphore(self, value=None) : return threading.Semaphore(value) def Array(self, typecode, sequence) : pass def Value(self, typecode, value) : pass def dict(self, mapping_or_sequence) : pass def list(self, sequence) : pass Copy the code
def add_data(p_dict, key, value) : p_dict[key] = value if __name__ == '__main__': progress_dict = Manager().dict() first = Process(target=add_data, args=(progress_dict, 'Bobby1'.22)) second = Process(target=add_data, args=(progress_dict, 'Bobby2'.23)) first.start() second.start() first.join() second.join() print(progress_dict) >> {'bobby1': 22.'bobby2': 23} Copy the code
- Pay attention to data synchronization and lock when necessary