Practical article 3
concurrent.futures
-
Encapsulated method, for multithreading and multi-process provides a unified interface, easy to call.
-
Advantage:
- The main thread can get the state of a thread or a task, and the return value
- Our main thread knows when a thread has finished
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import Future def get_html(times) : time.sleep(times) print('get page {} success'.format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # Set the maximum number of threads to 2 task1 = executor.submit(get_html,(3)) Submit the thread using the Submit method task2 = executor.submit(get_html,(2)) #executor returns a Future object task1.done() # this method is used to determine if a task has completed. This method does not block task2.cancel() # Cancel the thread, can only cancel before the thread has started, if the thread has started execution, cannot cancel task1.result() # return the result of the thread's execution, blocked, and returned only after the thread has finished Copy the code
-
Common methods:
-
As_completed (fs, timeout = None) :
- Generator that returns the completed
Future
- The results are not necessarily returned in the same order as the arguments, so whoever completes first is returned first
Get only the results of tasks that have already succeeded from concurrent.futures import as_completed urls = [3.4.2] all_task = [executor.submit(get_html, (url)) for url in urls] for future in as_completed(all_task): A non-blocking method that returns the result in the main thread as soon as the child thread completes execution data = future.result() print("get {} page success".format(data)) Copy the code
- Generator that returns the completed
-
The map () :
- Similar to python
map()
Function, which applies an iterable argument list to the same function, is still essentially a generator and returns directlyfuture.result()
- Similar to python
-
The order returned is the same as the order in which iterable arguments are called
Another way to get only the results of tasks that have already succeeded urls = [3.4.2] for future in executor.map(get_html, urls): print('get {} page success'.format(data)) Copy the code
-
Wait (fs, timeout=None, return_when=ALL_COMPLETED) : Blocks the main thread until some of the child threads specified have completed (set in the return_WHEN parameter)
-
With ThreadPoolExecutor(n) as executor: Calls are made using the with statement
-
Future
-
Submit (self, fn, ** args, **kwargs) :
class ThreadPoolExecutor(_base.Executor) : # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ def __init__(self, max_workers=None, thread_name_prefix=' ') : """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. """ if max_workers is None: # Use this number because ThreadPoolExecutor is often # used to overlap I/O instead of CPU work. max_workers = (os.cpu_count() or 1) * 5 if max_workers <= 0: raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) def submit(self, fn, *args, **kwargs) : with self._shutdown_lock: Add a lock to lock the following code if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() # generate a Future object, which will be returned w = _WorkItem(f, fn, args, kwargs) #WorkItem is the actual execution unit of the entire thread pool, and the Future object is passed into the WorkItem self._work_queue.put(w) The entire WorkItem instance is placed in the _work_queue self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self) : # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue) : q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. num_threads = len(self._threads) if num_threads < self._max_workers: Determine how many threads are started thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) The # thread executes the function _worker, and the function executed in _worker comes from self._work_queue The #_worker function is used to run a thread that keeps fetching _WorkItem from _work_queue t.daemon = True t.start() self._threads.add(t) Add the number of started threads to _threads _threads_queues[t] = self._work_queue class _WorkItem(object) : The purpose of this class is to execute the function and set the result of the execution into the Future def __init__(self, future, fn, args, kwargs) : self.future = future self.fn = fn The function to be executed is passed in here self.args = args self.kwargs = kwargs def run(self) : if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as exc: self.future.set_exception(exc) # Break a reference cycle with the exception 'exc' self = None else: self.future.set_result(result) Set the result of the function execution to the Future Copy the code
-
Common methods for Future:
-
Cancel (self) : Cancels the future
def cancel(self) : """Cancel the future if possible. Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """ with self._condition: # Use condition variables if self._state in [RUNNING, FINISHED]: Check whether the future is executing or completed, if not, it cannot be cancelled return False if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: return True self._state = CANCELLED self._condition.notify_all() self._invoke_callbacks() return True Copy the code
-
Cancelled (self) : Status check, whether it has been cancelled
-
Running (self) : Status check, whether is running
-
Done (self) : State judgment, whether completed or cancelled
-
The result (self, timeout = None) :
- Return the result of the call, which is a blocking method, again using condition
-
The wait() method of condition is called
-
Set_result (self, result) :
-
set the return value of work associated with the future
-
The notify_all() method of condition is called
-
-