preface

Starting with Python3.2, the standard library provides us with the concurrent.futures module, which provides ThreadPoolExecutor (thread pool) and ProcessPoolExecutor (process pool) classes.

Compared with threading and other modules, this module returns a future object through submit, which is a future object that can be used to learn the state of a thread or task. The main thread (or process) can obtain the execution state of a thread (process) or the execution state of a task and its return value:

The main thread retrieves the state of a thread (or task) and the return value. The main thread knows when a thread is finished. Make multithreading and multiprocess code interfaces consistent.

Basic use of thread pools

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time


def spider(page) :
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t:  Create a thread pool with a maximum capacity of 5
    task1 = t.submit(spider, 1)
    task2 = t.submit(spider, 2)  Submit the function to be executed to the thread pool via submit
    task3 = t.submit(spider, 3)

    print(f"task1: {task1.done()}")  The thread is done
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")

    time.sleep(2.5)
    print(f"task1: {task1.done()}")
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")
    print(task1.result())  Get the return value from result
Copy the code

The result is as follows:

task1: False
task2: False
task3: False
crawl task1 finished
crawl task2 finished
task1: True
task2: True
task3: False
1
crawl task3 finished
Copy the code

1. Construct an instance from ThreadPoolExecutor using the with statement, passing in the max_workers parameter to set the maximum number of concurrent threads in the thread pool.

2. Use the submit function to submit a task to the thread pool and return a handle to the task (similar to a file or drawing). Note that submit() does not block, but returns immediately.

3. Run the done() method to check whether the task is complete. As can be seen from the above example, the task status is judged immediately after the task is submitted, and all four tasks are not completed. After a delay of 2.5, task1 and task2 complete, but Task3 is still executing.

4. Use the result() method to get the return value of the task.

The main method

wait
wait(fs, timeout=None, return_when=ALL_COMPLETED)
Copy the code
  • Wait takes three arguments:
    • Fs: indicates the sequence to be executed
    • Timeout: The maximum time to wait, after which the thread will return \ even if it has not completed execution
    • Return_when: indicates the condition for waiting to return a result. The default value is ALL_COMPLETED

Again, use the above example to familiarize yourself with the usage example:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def spider(page) :
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t: 
    all_task = [t.submit(spider, page) for page in range(1.5)]
    wait(all_task, return_when=FIRST_COMPLETED)
    print('finished')
    print(wait(all_task, timeout=2.5))

Run result
crawl task1 finished
finished
crawl task2 finished
crawl task3 finished
DoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>})
crawl task4 finished
Copy the code

1. The return condition in the code is that when the first task is completed, the wait is stopped and the main thread task 2 continues. With the delay set, you can see that only Task4 is still running \

as_completed

The above provides a way to determine whether a task is over, but it cannot be done all the time in the main thread. The best way to do this is to return the result to the main thread when a task is finished, rather than always deciding whether each task is finished or not. In ThreadPoolExecutorThreadPoolExecutor as_completed () is one such method, after the child threads of task execution, with the result directly () get to return the result

Usage:

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def spider(page) :
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

def main() :
    with ThreadPoolExecutor(max_workers=5) as t:
        obj_list = []
        for page in range(1.5):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(f"main: {data}")

# Execution result
crawl task1 finished
main: 1
crawl task2 finished
main: 2
crawl task3 finished
main: 3
crawl task4 finished
main: 4
Copy the code

The as_completed() method is a generator that blocks until no task is completed unless timeout is set.

When a task is completed, yield the task, execute the statement below the for loop, and continue blocking until all tasks are finished. Meanwhile, the tasks completed first are returned to the main thread first.

map
map(fn, *iterables, timeout=None)
Copy the code
  • Fn: The first argument fn is a function that needs to be executed by the thread; \
  • Iterables: The second argument takes an iterable; \
  • Timeout: The third argument, timeout, is the same as timeout in wait(), but since map returns the result of thread execution, if timeout is shorter than thread execution time, TimeoutError is thrown.

Usage:

import time
from concurrent.futures import ThreadPoolExecutor

def spider(page) :
    time.sleep(page)
    return page

start = time.time()
executor = ThreadPoolExecutor(max_workers=4)

i = 1
for result in executor.map(spider, [2.3.1.4) :print("task{}:{}".format(i, result))
    i += 1

Run result
task1:2
task2:3
task3:1
task4:4
Copy the code

To use the map method, there is no need to use the submit method in advance. The map method has the same meaning as python’s higher-order map function, which executes the same function for each element in the sequence.

The above code executes the spider() function on each element in the list and allocates each thread pool.

You can see that the result is different from the result of the as_completed() method above. The output order is the same as the list order. Even if the 1s task is completed first, the result of the previously submitted task is printed first.

In actual combat

Multithreading

Take a website as an example to demonstrate the difference between thread pool and single thread

# coding: utf-8
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from requests import adapters

from proxy import get_proxies

headers = {
    "Host": "splcgk.court.gov.cn"."Origin": "https://splcgk.court.gov.cn"."User-Agent": "Mozilla / 5.0 (Windows NT 6.1; Win64; X64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"."Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",
}
url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page) :
    data = {
        "bt": ""."fydw": ""."pageNum": page,
    }
    for _ in range(5) :try:
            response = requests.post(url, headers=headers, data=data, proxies=get_proxies())
            json_data = response.json()
        except (json.JSONDecodeError, adapters.SSLError):
            continue
        else:
            break
    else:
        return {}

    return json_data

def main() :
    with ThreadPoolExecutor(max_workers=10) as t:
        obj_list = []
        begin = time.time()
        for page in range(1.15):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(data)
            print(The '*' * 50)
        times = time.time() - begin
        print(times)

if __name__ == "__main__":
    main()
Copy the code

Single-thread combat

Here we can use a single thread to crawl, the code is basically the same as above, add a single thread function code as follows:

# coding: utf-8
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from requests import adapters

from proxy import get_proxies

headers = {
    "Host": "splcgk.court.gov.cn"."Origin": "https://splcgk.court.gov.cn"."User-Agent": "Mozilla / 5.0 (Windows NT 6.1; Win64; X64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"."Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",
}
url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page) :
    data = {
        "bt": ""."fydw": ""."pageNum": page,
    }
    for _ in range(5) :try:
            response = requests.post(url, headers=headers, data=data, proxies=get_proxies())
            json_data = response.json()
        except (json.JSONDecodeError, adapters.SSLError):
            continue
        else:
            break
    else:
        return {}

    return json_data

def single() :
    begin = time.time()
    for page in range(1.15):
        data = spider(page)
        print(data)
        print(The '*' * 50)

    times = time.time() - begin
    print(times)


if __name__ == "__main__":
    single()
Copy the code

Running results: