Practical article 3

concurrent.futures

  • Encapsulated method, for multithreading and multi-process provides a unified interface, easy to call.

  • Advantage:

    • The main thread can get the state of a thread or a task, and the return value
    • Our main thread knows when a thread has finished
    from concurrent.futures import ThreadPoolExecutor
    from concurrent.futures import Future
    
    def get_html(times) :
        time.sleep(times)
        print('get page {} success'.format(times))
        return times
    
    executor = ThreadPoolExecutor(max_workers=2) # Set the maximum number of threads to 2
    
    task1 = executor.submit(get_html,(3)) Submit the thread using the Submit method
    task2 = executor.submit(get_html,(2)) #executor returns a Future object
    
    task1.done() # this method is used to determine if a task has completed. This method does not block
    task2.cancel() # Cancel the thread, can only cancel before the thread has started, if the thread has started execution, cannot cancel
    task1.result() # return the result of the thread's execution, blocked, and returned only after the thread has finished
    Copy the code
  • Common methods:

    • As_completed (fs, timeout = None) :

      • Generator that returns the completedFuture
      • The results are not necessarily returned in the same order as the arguments, so whoever completes first is returned first
      Get only the results of tasks that have already succeeded
      from concurrent.futures import as_completed
      urls = [3.4.2]
      all_task = [executor.submit(get_html, (url)) for url in urls]
      for future in as_completed(all_task): 
          A non-blocking method that returns the result in the main thread as soon as the child thread completes execution
          data = future.result()
          print("get {} page success".format(data))
      Copy the code
    • The map () :

      • Similar to pythonmap()Function, which applies an iterable argument list to the same function, is still essentially a generator and returns directlyfuture.result()
    • The order returned is the same as the order in which iterable arguments are called

      Another way to get only the results of tasks that have already succeeded
      urls = [3.4.2]
      for future in executor.map(get_html, urls):
          print('get {} page success'.format(data))
      Copy the code
    • Wait (fs, timeout=None, return_when=ALL_COMPLETED) : Blocks the main thread until some of the child threads specified have completed (set in the return_WHEN parameter)

    • With ThreadPoolExecutor(n) as executor: Calls are made using the with statement

Future

  • Submit (self, fn, ** args, **kwargs) :

    class ThreadPoolExecutor(_base.Executor) :
    
        # Used to assign unique thread names when thread_name_prefix is not supplied.
        _counter = itertools.count().__next__
    
        def __init__(self, max_workers=None, thread_name_prefix=' ') :
            """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. """
            if max_workers is None:
                # Use this number because ThreadPoolExecutor is often
                # used to overlap I/O instead of CPU work.
                max_workers = (os.cpu_count() or 1) * 5
            if max_workers <= 0:
                raise ValueError("max_workers must be greater than 0")
    
            self._max_workers = max_workers
            self._work_queue = queue.Queue()
            self._threads = set()
            self._shutdown = False
            self._shutdown_lock = threading.Lock()
            self._thread_name_prefix = (thread_name_prefix or
                                        ("ThreadPoolExecutor-%d" % self._counter()))
        
        
        def submit(self, fn, *args, **kwargs) :
            with self._shutdown_lock: Add a lock to lock the following code
                if self._shutdown:
                    raise RuntimeError('cannot schedule new futures after shutdown')
    
                f = _base.Future() # generate a Future object, which will be returned
                w = _WorkItem(f, fn, args, kwargs)
                #WorkItem is the actual execution unit of the entire thread pool, and the Future object is passed into the WorkItem
    
                self._work_queue.put(w) The entire WorkItem instance is placed in the _work_queue
                self._adjust_thread_count()
                return f
        submit.__doc__ = _base.Executor.submit.__doc__
        
        def _adjust_thread_count(self) :
            # When the executor gets lost, the weakref callback will wake up
            # the worker threads.
            def weakref_cb(_, q=self._work_queue) :
                q.put(None)
            # TODO(bquinlan): Should avoid creating new threads if there are more
            # idle threads than items in the work queue.
            num_threads = len(self._threads)
            if num_threads < self._max_workers: Determine how many threads are started
                thread_name = '%s_%d' % (self._thread_name_prefix or self,
                                         num_threads)
                t = threading.Thread(name=thread_name, target=_worker,
                                     args=(weakref.ref(self, weakref_cb),
                                           self._work_queue)) 
                The # thread executes the function _worker, and the function executed in _worker comes from self._work_queue
                The #_worker function is used to run a thread that keeps fetching _WorkItem from _work_queue
                t.daemon = True
                t.start()
                self._threads.add(t) Add the number of started threads to _threads
                _threads_queues[t] = self._work_queue
                
     class _WorkItem(object) :
        The purpose of this class is to execute the function and set the result of the execution into the Future
        def __init__(self, future, fn, args, kwargs) :
            self.future = future
            self.fn = fn The function to be executed is passed in here
            self.args = args
            self.kwargs = kwargs
    
        def run(self) :
            if not self.future.set_running_or_notify_cancel():
                return
    
            try:
                result = self.fn(*self.args, **self.kwargs)
            except BaseException as exc:
                self.future.set_exception(exc)
                # Break a reference cycle with the exception 'exc'
                self = None
            else:
                self.future.set_result(result) Set the result of the function execution to the Future
    Copy the code
  • Common methods for Future:

    • Cancel (self) : Cancels the future

      def cancel(self) :
              """Cancel the future if possible. Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """
              with self._condition: # Use condition variables
                  if self._state in [RUNNING, FINISHED]: 
                      Check whether the future is executing or completed, if not, it cannot be cancelled
                      return False
      
                  if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                      return True
      
                  self._state = CANCELLED
                  self._condition.notify_all()
      
              self._invoke_callbacks()
              return True
      Copy the code
    • Cancelled (self) : Status check, whether it has been cancelled

    • Running (self) : Status check, whether is running

    • Done (self) : State judgment, whether completed or cancelled

    • The result (self, timeout = None) :

      • Return the result of the call, which is a blocking method, again using condition
    • The wait() method of condition is called

    • Set_result (self, result) :

      • set the return value of work associated with the future

      • The notify_all() method of condition is called