Practical article four

Multiple processes

Using multiple processes under Windows, regardlessProcessPoolExecutorormultiprocessing, the code will be placed inif __ name__ == '__ main __'Module, otherwise an error is reported

from concurrent.futures import ProcessPoolExecutor, as_completed
def fib(b) :
    if n <= 2:
        return 1
    return fib(n-1)+fib(n-2)
if __name__ == '__main__':
    with ProcessPoolExecutor(3) as executor:
        all_task = [executor.submit(fib,(num)) for num in range(15.45)]
        for future in as_completed(all_task):
            data = future.result()
            print('result: {}'.format(data))
Copy the code
  • When dealing with CPU-intensive problems, using multiple processes is faster than using multiple threads, but it’s hard to double that because switching between processes takes more time than switching between threads.

multiprocessing

import time
import multiprocessing

def get_html(n) :
    time.sleep(n)
    return n

if __name__ == '__main__':
    progress = multiprocessing.Process(target=get_html, args=(2,))
    progress.start()
    progress.join()
    print(progress.pid) Print the ID of the process
Copy the code

The process pool of multiprocessing

pool = multiprocessing.Pool(multiprocessing.cpu_count())
result = pool.apply_async(get_html, args=(3)),# Process commit
pool.close() Close should be called before calling.join to close the process pool so that it can no longer accept processes, otherwise the.join method will fail
pool.join()
print(result.get())
Copy the code
  • Common methods:

    • Apply_async (self, func, args=(), KWDS ={}, callback=None, error_callback=None) : Asynchronous version of the apply() method

    • Apply (self, fuc, args=(), KWDS ={}) : Add the process

    • Imap (self, func, iterable, chunksize=1) : corresponds to the map method in the previous executor

      pool = multiprocessing.Pool(multiprocessing.cpu_count())
      for result in pool.imap(get_html, [1.5.3) :print('{} sleep success'.format(result))
      >>
      1 sleep success
      5 sleep success
      3 sleep success
      Copy the code
    • Imap_unordered (self, func, iterable, chunksize=1) : Executes first, returns first

      for result in pool.imap_unordered(get_html, [1.5.3) :print('{} sleep success'.format(result))
      >>
      1 sleep success
      3 sleep success
      5 sleep success
      Copy the code

Communication between processes

  • Note that queues in multi-processes are not the same as queues in multi-threads. Here we use queues from maltiProcessing

    from multiprocessing import Process,Queue
    import multiprocessing
    
    def producer(queue) :
        queue.put("a")
        time.sleep(2)
    
    def consumer(queue) :
        time.sleep(2)
        data = queue.get()
        print(data)
    
    if __name__ == "__main__":
        queue = Queue(10)
        my_producer = Process(target=producer, args=(queue,))
        my_consumer = Process(target=consumer, args=(queue,))
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()
    Copy the code
  • Note that queues created in maltiprocessing cannot be used in Pool processes

    from multiprocessing import Pool,Queue
    if __name__ == "__main__":
        queue = Queue(10)
        pool = Pool(2)
    
        pool.apply_async(producer, args=(queue,))
        pool.apply_async(consumer, args=(queue,))
    
        pool.close()
        pool.join()
    # No result is returned
    Copy the code
  • Note that communication between processes in the pool needs to use queues from the Manager

    from maltiprocessing import Manager
    if __name__ == "__main__":
        queue = Manager().Queue(10)
        pool = Pool(2)
    
        pool.apply_async(producer, args=(queue,))
        pool.apply_async(consumer, args=(queue,))
    
        pool.close()
        pool.join()
    Copy the code
  • Use PIPE for communication

    • Pipe can only be used for communication between two processes

    • Pipe returns an instance of Connection. Common methods for Connection are send(self, obj) and recv(self).

      def Pipe(duplex=True) :
          return Connection(), Connection()
      
      class Connection(object) :
          def send(self, obj) :
              pass
      
          def recv(self) :
              pass
      Copy the code
    • Pipes perform better than queues

      from maltiprocessing import Pipe
      def producer(pipe) :
          pipe.send("bobby")
      
      def consumer(pipe) :
          print(pipe.recv())
      
      if __name__ == "__main__":
          recevie_pipe, send_pipe=Pipe() # Both ends of the pipe, sender and receiver
      
          my_producer= Process(target=producer, args=(send_pipe, ))
          my_consumer = Process(target=consumer, args=(recevie_pipe,))
      
          my_producer.start()
          my_consumer.start()
          my_producer.join()
          my_consumer.join()
      Copy the code
  • Sharing global variables is not appropriate in multiple processes

    def producer(a) :
        a += 100
        time.sleep(2)
    def consumer(a) :
        time.sleep(2)
        print(a)
    if __name__ == '__main__':
        a = 1
        my_producer = Process(target=producer,args=(a,))
        my_consumer = Process(target=consumer,args=(a,))
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()
    >> 1 # return 1, not 101
    Copy the code
  • Interprocess synchronization (maintaining a common memory/variable) using Manager()

    def Manager() :
        return multiprocessing.SyncManager()
    
    class SyncManager(multiprocessing.managers.BaseManager) :
        def Condition(self, lock=None) :
            return threading.Condition(lock)
    
        def Event(self) :
            return threading.Event()
    
        def Lock(self) :
            return threading.Lock()
    
        def Namespace(self) :
            pass
    
        def Queue(self, maxsize=None) :
            return queue.Queue()
    
        def RLock(self) :
            return threading.RLock()
    
        def Semaphore(self, value=None) :
            return threading.Semaphore(value)
    
        def Array(self, typecode, sequence) :
            pass
    
        def Value(self, typecode, value) :
            pass
    
        def dict(self, mapping_or_sequence) :
            pass
    
        def list(self, sequence) :
            pass
    Copy the code
    def add_data(p_dict, key, value) :
        p_dict[key] = value
        
    if __name__ == '__main__':
        progress_dict = Manager().dict()
        
        first = Process(target=add_data, args=(progress_dict, 'Bobby1'.22))
        second = Process(target=add_data, args=(progress_dict, 'Bobby2'.23))
        
        first.start()
        second.start()
        first.join()
        second.join()
        
        print(progress_dict)
    >> {'bobby1': 22.'bobby2': 23}
    Copy the code
    • Pay attention to data synchronization and lock when necessary