0. References
- Concurrent. futures — Launching parallel tasks
- Concurrent. futures – Manage Pools of concurrent Tasks
1. An overview of the
Concurrent. futures is a new module introduced in 3.2 that provides a high-level interface for asynchronous execution of callable objects. You can use a ThreadPoolExecutor for multithreaded programming and a ProcessPoolExecutor for multiprocess programming, and both implement the same interfaces defined by the abstract Executor class. This module provides two main types, Executor and Future. The executor is used to manage the work pool, and the Future is used to manage the results of the work calculation. There is usually no need to directly manipulate the Future object because of the rich API.
2. Executor Object
Concurrent. Futures. Executor class this abstract class provides a series of methods, can be used for asynchronous execution call. It cannot be used directly, but only by subclassing concrete classes.
It defines methods such as submit(fn, *args, **kwargs) to arrange for the callable fn to be executed as fn(*args, **kwargs), and returns a Future object to represent its execution.
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow.323.1235)
print(future.result())
Copy the code
Map (func, *iterables, timeout=None, chunksize=1) (func, *iterables, timeout=None, chunksize=1)
- Immediate access to
iterables
Rather than lazy acquisition; - Asynchronous execution
func
And supports multiple concurrent calls.
It returns an iterator. Began from the call Executor. The map () the timeout seconds later, if on the iterator call __next__ () and use the results, the iterator is thrown concurrent. Futures. TimeoutError anomalies. Timeout seconds can be a float or an integer. If set to None or not specified, there is no limit to the wait time.
If the func call throws an exception, it will be thrown when the value is retrieved from the iterator.
When using ProcessPoolExecutor, this method splits iterables into blocks that are submitted to the process pool as separate tasks. The approximate size of these blocks can be specified by specifying a positive integer for chunksize. For very long iterables, using a larger chunksize rather than the default of 1 can significantly improve performance. For ThreadPoolExecutor, chunksize has no effect.
Chunksize is a new parameter added in 3.5.
Note: Map always returns values based on the order of input, regardless of the order in which concurrent tasks are executed. The iterator returned by map waits for the response of each item while the main program iterates.
Shutdown (wait=True) tells the executor to release all resources used by the executor when all future objects currently waiting have finished running. Calls to executor.submit () and executor.map () after shutdown will result in RuntimeError. If wait is True, this method will not return until all waiting Futures have been executed and all resources belonging to the executor have been released. If wait is False, this method returns immediately. Resources belonging to the executor are released after all waiting Futures have executed. Regardless of the value of wait, the entire Python program does not exit until the waiting Future finishes executing. You can avoid explicitly calling this method by using the with statement. The with statement calls the executor.shutdown () method with the default wait=True.
import shutil
with ThreadPoolExecutor(max_workers=4) as e:
e.submit(shutil.copy, 'src1.txt'.'dest1.txt')
e.submit(shutil.copy, 'src2.txt'.'dest2.txt')
e.submit(shutil.copy, 'src3.txt'.'dest3.txt')
e.submit(shutil.copy, 'src4.txt'.'dest4.txt')
Copy the code
The Executor class implements the context protocol and can be used as a context manager. It can execute tasks concurrently and wait for them all to complete. The shutdown() method is automatically called when the context manager exits.
3. ThreadPoolExecutor specifies the ThreadPoolExecutor
ThreadPoolExecutor is a subclass of Executor that uses a thread pool to make asynchronous calls. It manages a set of worker threads, passing tasks to them when they have a surplus. Deadlock deadlock occurs when a callable belonging to a Future object waits for the return of another Future. Here’s an example:
import time
def wait_on_b() :
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a() :
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
Copy the code
Here’s another example:
def wait_on_future() :
f = executor.submit(pow.5.2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
Copy the code
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=”, initializer=None, Initargs =()) This Executor subclass uses at most max_workers threads to execute calls asynchronously.
Initializer is an optional callable that is invoked before each worker thread starts. Initargs is the parameter tuple passed to initializer. If initializer raises an exception, all waiting tasks will raise the BrokenThreadPool exception, as will the submit task that continues to submit.
Change in 3.5: If max_worker is not specified or is None, the default is the number of local processors multiplied by 5.
3.6 New feature: The thread_name_prefix parameter has been added to control the name of the worker thread created by the thread pool for debugging purposes.
Changed in 3.7: Initializer and initargs were added.
4. ThreadPoolExecutor example
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/'.'http://www.cnn.com/'.'http://europe.wsj.com/'.'http://www.bbc.co.uk/'.'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout) :
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
Copy the code
5. ProcessPoolExecutor specifies the ProcessPoolExecutor
The ProcessPoolExecutor ProcessPoolExecutor class is a subclass of the Executor Executor class and uses the process pool to execute calls asynchronously. The ProcessPoolExecutor uses the MultiProcessing module, which allows it to circumvent the Global Interpreter Lock, but means that only serializable (picklable) objects can be executed and returned.
The __main__ module must be imported by the worker child process, which means that the ProcessPoolExecutor does not work in the interactive interpreter.
Using an Executor or Future method in a callable object that has been committed to a ProcessPoolExecutor causes a deadlock.
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, Initargs =()) This Executor subclass uses a maximum of max_workers processes to execute calls asynchronously. If max_workers is not specified or None, it defaults to the number of processors on the machine. If max_workers is less than or equal to 0, ValueError is raised. Mp_context is a multiprocessing context or None, which is used to start workers. If mp_context is not specified or None, the default multi-process context is used.
Initializer is an optional callable that is invoked before each worker process starts. Initargs is the parameter tuple passed to initializer. If Initializer throws an exception, all waiting tasks will raise the BrokenProcessPool exception, as well as the submit task that continues to submit.
Changed in version 3.3: the BrokenProcessPool exception is now raised when a worker process abruptly stops. In previous versions, the behavior was undefined, and operations on the executor or its Future object were often unresponsive or deadlocked.
Changed in version 3.7: the mp_context parameter was added to allow users to control the start_method method of worker processes created from the process pool. This version also adds the initializer and initargs parameters.
6. ProcessPoolExecutor example
import concurrent.futures
import math
PRIMES = [
112272535095293.112582705942171.112272535095293.115280095190773.115797848077099.1099726899285419]
def is_prime(n) :
if n % 2= =0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1.2) :if n % i == 0:
return False
return True
def main() :
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Copy the code
7. The Future object
The Future class encapsulates the asynchronous execution of callable objects. Future instances are created using executor.submit ().
Concurrent. Futures. Future encapsulates the asynchronous execution of callable object. Future instances are created using executor.submit () and should not be created manually unless used for testing.
cancel()
Attempt to cancel the call. If the call is in progress and cannot be cancelled, this method returnsFalse
Otherwise the call is cancelled and returnedTrue
.
Understanding: When a Future instance has been committed but not yet executed, it can be cancelled by calling the instance’s cancel() method.
cancelled()
Returns if the call has been successfully cancelledTrue
.running()
Returns if the call is executing and cannot be cancelledTrue
.done()
Returns if the call was successfully cancelled or completedTrue
.result(timeout=None)
Returns the return value of the call. If the call has not completed, wait at mosttimeout
Seconds. iftimeout
It’s not done after seconds, throw itconcurrent.futures.TimeoutError
.timeout
Can be an integer or a floating point number. If not specified or isNone
, the waiting time is not limited iffuture
If it is cancelled before completion, it is thrownCancelledError
The exception.
If the call throws an exception, this method throws the same exception.
Understanding: Result () blocks until the task is complete, or cancelled. If the results need to be accessed sequentially, use the map method of the executor, and if not, use the module function as_completed().
exception(timeout=None)
Returns the exception thrown by the call. If the call has not completed, wait a maximum of timeout seconds. If the timeout seconds later is not yet complete, throw out concurrent. Futures. TimeoutError. Timeout can be an integer or a floating point number. If not specified or None, the wait time is not limited. If the Future is cancelled before completion, a CancelledError exception is raised. If the call completes and no exception is thrown, None is returned.
add_done_callback(fn)
Attach a callable fn to the Future. When the Future finishes running or is cancelled, it is used as the only argument to fn and fn is called. Callable objects are called in the order in which they are added, and are always called within a thread of the process in which they are added. If the callable throws an Exception that is a subclass of Exception, it is logged and ignored. If it throws an exception that is a subclass of BaseException, the behavior is undefined. If the Future has been completed or cancelled, fn is called immediately.
Understanding: Instead of explicitly waiting for the result to return, this method can specify in advance what call should be made when the Future completes. Fn is a single argument callable object. It is best to check the state of the passed Future object before using it, because the Future object is considered finished whether it ends normally, throws an exception, or is cancelled.
8. Module functions
Concurrent.futures. Wait (fs, timeout=None, return_when=ALL_COMPLETED) to wait for Future instances to complete, which may be created by multiple different executor instances, These Future instances are specified through fs. Returns a named tuple with two elements, each of which is a set. The first element is called DONE, and this collection includes the finished Futures; The second element is called NOT_done, and this collection contains the futures that are not finished. Timeout controls the maximum number of seconds to wait before returning. It can be an integer or a floating point number. If not specified or None, the wait time is not limited. Return_when specifies when the function should return. It must be one of the following constants:
FIRST_COMPLETED
: function in any onefuture
Return when completed or cancelled.FIRST_EXCEPTION
: function in any onefuture
Returns when terminated because of an exception. If there is nofuture
Throw an exception, which is equivalent toALL_COMPLETED
.ALL_COMPLETED
: when allfuture
The function returns only when completed or cancelled.
Concurrent.futures. As_completed (fs, timeout=None) Returns an iterator for Future instances specified by FS after they have all completed or been cancelled. Future instances in FS can be created by different actuators. Any Future instances that have completed before as_completed() is called will be generated first.
Looking at the source code, it turns out that this is actually a generator function that uses yield from, so the call returns a generator.
If from as_completed () call, after a timeout seconds, the return of the iterator call __next__ () when the result is still unavailable, would be thrown concurrent. Futures. TimeoutError anomalies. Timeout can be an integer or a floating point number. If timeout is not specified or None, there is no limit to the wait time.
9. To be perfect
Future
Internal methods: Methods for unit testing and implementing custom actuators.set_running_or_notify_cancel()
set_result(result)
set_exception(exception)
- Related exception class
Completed in 2019.02.18