Refer to the website
www.cnblogs.com/venvive/p/1…
1 / ThreadPoolExecutor multithreading
<1> Why do we need a thread pool?
If you create 20 threads, and you only allow 3 threads to run at most all 20 threads need to be created and destroyed, and until a thread is executed, it's in a queued state. The creation of threads consumes system resources. So the idea of a thread pool is that each thread is assigned one task, and the rest of the tasks are queued, and when a thread completes the task, the queued tasks can be assigned to that thread to continue.Copy the code
<2> Standard library concurrent.futures module
It provides classes ThreadPoolExecutor and ProcessPoolExecutor that further abstracts from threading and Multiprocessing, respectively. - The main thread can get the status of a thread (or task) and the return value - the main thread can immediately know when a thread has finished - the code interface for multi-threading and multi-process is consistentCopy the code
<3> Easy to use
# -*-coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import time
The Times parameter is used to simulate the network request time
def get_html(times) :
print("get page {}s finished".format(times))
return times
Create a thread pool
# set the maximum number of threads in the thread pool that can run at the same time
executor = ThreadPoolExecutor(max_workers=2)
Submit the executed function to the thread pool via the Submit function, which returns immediately without blocking
# task1 and task2 are task handles
task1 = executor.submit( get_html, (2) )
task2 = executor.submit( get_html, (3))The # done() method is used to determine if a task is complete, bool, True if it is complete, False if it is not
print( task1.done() )
The # cancel() method is used to cancel a task that cannot be canceled until it has been placed in the thread pool
# bool, return True on success, False on failure
print( task2.cancel() )
The # result() method can get the result of task execution if the get_html() function returns a value
print( task1.result() )
print( task2.result() )
# the results:
# get page 3s finished
# get page 2s finished
# True
# False
# 2
# 3
The # ThreadPoolExecutor class, when constructing an instance, passes in the max_workers parameter to set the maximum number of concurrent threads in the thread pool
Submit the task (function name and parameters) that the thread needs to execute to the thread pool and return a handle to the task.
# Notice that submit() does not block, but returns immediately.
Done (); cancel(); submit();
# Use the result() method to get the return value of the task. Look at the internal code and find that the method blocks
Copy the code
<4>as_completed (Get all the results at once)
Although the above provides a method to determine whether a task is over, it cannot be determined in the main thread all the time. Sometimes we know that a task is over, and then we get the result, instead of determining whether each task is over. At this point, you can use the AS_Completed method to fetch the results of all the tasks at once.Copy the code
# -*-coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor,as_completed
import time
The Times parameter is used to simulate the network request time
def get_html(times) :
time.sleep(times)
print("get page {}s finished".format(times))
return times
Create a thread pool
# Set up a maximum of 2 threads to run, others to wait
executor = ThreadPoolExecutor(max_workers=2)
urls = [3.2.4]
Put all tasks into the thread pool at one time, but can only execute 2 tasks at the same time
all_task = [ executor.submit(get_html,(url)) for url in urls ]
for future in as_completed( all_task ):
data = future.result()
print("in main:get page {}s success".format(data))
# the results
# get page 2s finished
# in main:get page 2s success
# get page 3s finished
# in main:get page 3s success
# get page 4s finished
# in main:get page 4s success
As you can see from the result, the url is not passed in first, which is executed first
Copy the code
< 5 > map () method
In addition to the as_completed() method above, you can also use the execumap method. However, the map method does not need to be submitted in advance. The map method has the same meaning as the Map method in the Python standard library, which executes the same function for each element in the sequence. The code above simply executes the get_html() function on each element in the urls list and allocates each thread pool. As you can see, the result is different from the result of the as_completed method above. The output order is the same as the list of urls. Even if the 2s task completes first, the 3s task completes first and then the 2s task completesCopy the code
# -*-coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor,as_completed
import time
The Times parameter is used to simulate the network request time
def get_html(times) :
time.sleep(times)
print("get page {}s finished".format(times))
return times
Create a thread pool
# Set up a maximum of 2 threads to run, others to wait
executor = ThreadPoolExecutor(max_workers=2)
urls = [3.2.4]
for result in executor.map(get_html, urls):
print("in main:get page {}s success".format(result))
# the results
# get page 2s finished
# get page 3s finished
# in main:get page 3s success
# in main:get page 2s success
# get page 4s finished
# in main:get page 4s success
Copy the code
< 6 > wait () method
The wait method allows the main thread to block until the specified requirement is met. The wait method takes three parameters, the waiting task sequence, the timeout, and the wait condition. Wait condition return_WHEN defaults to ALL_COMPLETED, indicating that all tasks are expected to stay. You can also set the wait condition to FIRST_COMPLETED, indicating that the wait will stop when the first task is completed. The timeout period parameter is optional. The wait() method has nothing to do with as_completed(), map(). Whether you use as_completed() or the map() method, you can always use wait() before executing the main thread. As_completed () and map() are optional.Copy the code
# -*-coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor,wait,ALL_COMPLETED,FIRST_COMPLETED
import time
The Times parameter is used to simulate the network request time
def get_html(times) :
time.sleep(times)
print("get page {}s finished".format(times))
return times
Create a thread pool
# Set up a maximum of 2 threads to run, others to wait
executor = ThreadPoolExecutor(max_workers=2)
urls = [3.2.4]
all_task = [executor.submit(get_html,(url)) for url in urls]
wait(all_task,return_when=ALL_COMPLETED)
print("main")
# the results
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main
Copy the code
2 / ProcessPoolExecutor process
<1> Synchronous invocation: call, and wait for the return value, can be decoupled, but slow
import datetime
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import time, random, os
import requests
def task(name) :
print('%s %s is running'%(name,os.getpid()))
#print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
p = ProcessPoolExecutor(4) # set
for i in range(10) :Synchronous call method, not only call, but also wait for the return value
obj = p.submit(task, "Process PID:") Pass the parameter (task name, parameter) using position or keyword parameter
res = obj.result()
p.shutdown(wait=True) Close the process pool entry and wait for tasks in the pool to complete
print("The Lord")
# # # # # # # # # # # # # # # #
# # # # # # # # # # # # # # # #
# Another demo of synchronous calls
def get(url) :
print('%s GET %s' % (os.getpid(),url))
time.sleep(3)
response = requests.get(url)
if response.status_code == 200:
res = response.text
else:
res = "Download failed"
return res # return value
def parse(res) :
time.sleep(1)
print("%s parse result is %s" %(os.getpid(),len(res)))
if __name__ == "__main__":
urls = [
'https://www.baidu.com'.'https://www.sina.com.cn'.'https://www.tmall.com'.'https://www.jd.com'.'https://www.python.org'.'https://www.openstack.org'.'https://www.baidu.com'.'https://www.baidu.com'.'https://www.baidu.com',
]
p=ProcessPoolExecutor(9)
l=[]
start = time.time()
for url in urls:
future = p.submit(get,url) # wait for result, so call synchronously
l.append(future)
Close the process pool and wait for all processes to finish executing
p.shutdown(wait=True)
for future in l:
parse(future.result())
print('Completion time :',time.time()-start)
# Completion time: 13.209137678146362
Copy the code
<2> Asynchronous invocation: call only, no return value, possible coupling, but fast
def task(name) :
print("%s %s is running" %(name,os.getpid()))
time.sleep(random.randint(1.3))
if __name__ == '__main__':
p = ProcessPoolExecutor(4) Set the process in the process pool
for i in range(10) :# async call mode, only call, no return value
p.submit(task,'Process pid:') # Pass parameter (task name, parameter), parameter using position parameter or keyword parameter
p.shutdown(wait=True) Close the process pool entry and wait for tasks in the pool to complete
print('the Lord')
# # # # # # # # # # # # # # # # # #
# # # # # # # # # # # # # # # # # #
# Another demo of asynchronous calls
def get(url) :
print('%s GET %s' % (os.getpid(),url))
time.sleep(3)
reponse = requests.get(url)
if reponse.status_code == 200:
res = reponse.text
else:
res = "Download failed"
parse(res) # no return value
def parse(res) :
time.sleep(1)
print('%s' resolves to %s' %(os.getpid(),len(res)))
if __name__ == '__main__':
urls = [
'https://www.baidu.com'.'https://www.sina.com.cn'.'https://www.tmall.com'.'https://www.jd.com'.'https://www.python.org'.'https://www.openstack.org'.'https://www.baidu.com'.'https://www.baidu.com'.'https://www.baidu.com',
]
p = ProcessPoolExecutor(9)
start = time.time()
for url in urls:
future = p.submit(get,url)
p.shutdown(wait=True)
print("Completion time",time.time()-start)Completion time 6.293345212936401
Copy the code
<3> How to use asynchronous invocation but avoid coupling problems?
(1) Process pool: asynchronous + callback function, CPU intensive, simultaneous execution, each process has a different interpreter and memory space, do not interfere with each other
def get(url) :
print('%s GET %s' % (os.getpid(), url))
time.sleep(3)
response = requests.get(url)
if response.status_code == 200:
res = response.text
else:
res = 'Download failed'
return res
def parse(future) :
time.sleep(1)
The result operation is required to return the value of the object
res = future.result()
print("res".)print('%s' resolves to %s' % (os.getpid(), len(res)))
if __name__ == '__main__':
urls = [
'https://www.baidu.com'.'https://www.sina.com.cn'.'https://www.tmall.com'.'https://www.jd.com'.'https://www.python.org'.'https://www.openstack.org'.'https://www.baidu.com'.'https://www.baidu.com'.'https://www.baidu.com',
]
p = ProcessPoolExecutor(9)
start = time.time()
for url in urls:
future = p.submit(get,url)
Parse uses the return value of the future object to execute the task
# callback should be equivalent to parse(future)
future.add_done_callback(parse)
p.shutdown(wait=True)
print("Completion time",time.time()-start)# Completion time 33.79998469352722
Copy the code
(2) Thread pool: asynchronous + callback function, THE MAIN use of IO intensive, thread pool: to execute the operation for who is free to execute
def get(url) :
print("%s GET %s" %(current_thread().name,url))
time.sleep(3)
reponse = requests.get(url)
if reponse.status_code == 200:
res = reponse.text
else:
res = "Download failed"
return res
def parse(future) :
time.sleep(1)
res = future.result()
print("%s parse result is %s" %(current_thread().name,len(res)))
if __name__ == '__main__':
urls = [
'https://www.baidu.com'.'https://www.sina.com.cn'.'https://www.tmall.com'.'https://www.jd.com'.'https://www.python.org'.'https://www.openstack.org'.'https://www.baidu.com'.'https://www.baidu.com'.'https://www.baidu.com',
]
p = ThreadPoolExecutor(4)
start = time.time()
for url in urls:
future = p.submit(get,url)
future.add_done_callback(parse)
p.shutdown(wait=True)
print("The Lord",current_thread().name)
print("Completion time",time.time()-start)# Completion time 32.52604126930237
Copy the code
3 / summary
1, the more threads the better, will involve CPU context switch (will save the last record). 2, process than thread consumption of resources, process is equivalent to a factory, there are a lot of people in the factory, the people inside the common enjoyment of welfare resources, a process in the default only one main thread, such as: open the process is a process, which is executed by the thread, the thread is just a process to create more people to work at the same time. Thread: thread is the smallest unit of work in the computer. 6. Process: by default, there is the main thread (helping work), and multiple threads can coexist. GIL Global interpreter lock: ensures that only one thread is scheduled by the CPU at a timeCopy the code