1. Use of Queue message queues

This is the 8th day of my participation in Gwen Challenge

1.1 Functions of Queue

1.Python's Queue module provides an implementation of FIFO for multithreaded programming. 2. Complete data and message transfer between multiple threads. Multiple threads can share the same Queue instance. 3. The size of the Queue (number of elements) can be used to limit memory usage 4. Transfer of data and messages between multiple processesCopy the code

1.2 Use of Queue

1.2.1 Importing Queue

from queue import Queue   # applies to threads and coroutines
from multiprocessing import JoinableQueue as Queue  # used for processes
Copy the code

2.2Queue

q = Queue(maxsize=100) # maxsize specifies the queue length
item = {}
q.put_nowait(item) [error] queue full [error
q.put(item) Block waiting when queue is full
q.get_nowait() [error] queue empty
q.get() # fetch data, block wait when queue is empty
q.qsize() Get the number of existing data in the queue
q.join() The main thread is blocked when the count is not 0, and the queue continues when the count is 0
         # q.jin () blocks the main thread, used in conjunction with task_done()
         # put() makes the count +1, task_done() makes the count -1
         # count 0 to stop blocking and let the main thread continue
q.task_done() Task_done = task_done = task_done = task_done
Copy the code

1.3 Benefits of Using Queue

Before, when learning threads, multithreading would have competition for resources. In order to solve the competition problem, I used locking and thread synchronization, which solved the problem but reduced efficiency. Message queues can be used to improve efficiency and resolve contention.

2. Producers and consumers

In software development, it is common to encounter scenarios where some modules are responsible for producing data that is processed by other modules (modules here could be functions, threads, processes, etc.). The module that produces the data is called a producer, and the module that processes the data is called a consumer

Advantages of the producer-consumer model:

  • Uncoupling (when two tasks are not directly related and you only do your own thing, regardless of others)
  • Concurrency improves efficiency

Code examples:

# -*- coding: utf-8 -*-
import time
from queue import Queue
from threading import Thread

data_queue = Queue()

# producers
def write_data() :
    for i in range(10):
        data_queue.put(i)
        print("The producer produced the data as:", i)
        time.sleep(0.5)
    print("The producer is finished.")

# consumers
def read_data() :
    while True:
        if data_queue.qsize():
            data = data_queue.get()
            print("Consumer spending data is:", data)
            time.sleep(1)
            data_queue.task_done()
        else:
            break


if __name__ == '__main__':
    write_thread = Thread(target=write_data, daemon=True)
    read_thread = Thread(target=read_data, daemon=True)
    write_thread.start()
    read_thread.start()
    data_queue.join()
    print("End of program")


Copy the code

3. Single-threaded crawlers

# -*- coding: utf-8 -*-
import time
import requests

from lxml import etree


class Spider(object) :
    def __init__(self) :
        Store a list of level 1 links
        self.all_url_list = []
        Store a list of each URL converted to Element objects
        self.html_list = []

    def get_all_url(self) :
        """ combine each URL that needs to send a network request """
        for i in range(1.20):
            self.all_url_list.append("http://www.1ppt.com/xiazai/zongjie/ppt_zongjie_{}.html".format(i))

    def get_html(self, url) :
        """ Send a request to the URL, get a response, and convert it to an Element object.
        print("Request:", url)
        response = requests.get(url)
        html = etree.HTML(response.content)
        self.html_list.append(html)
        print("Finished")

    def get_data(self, html) :
        """ Get the data for each Element object and print the data. ""
        a_list = html.xpath('//ul[@class="tplist"]//h2/a')
        for i in a_list:
            item = {}
            item['title'] = i.xpath('./text()') [0]
            item['url'] = i.xpath('./@href') [0]
            print(item)

    def run(self) :
        Collate the level 1 links that need to be climbed
        self.get_all_url()
        Iterating through urls, sending requests to each URL for data, and converting the resulting data into an Element object to add to html_list
        for i in self.all_url_list:
            self.get_html(i)
        Iterate through the HTML to get the data we need from each Element object
        for html in self.html_list:
            self.get_data(html)


if __name__ == '__main__':
    # crawler startup time
    start_time = time.time()
    spider = Spider()
    spider.run()
    # crawler end time
    end_time = time.time()
    Total crawler time
    print("Total crawler time", end_time - start_time)
    Total time of crawler 32.52687358856201

Copy the code

4. Multi-threaded crawlers

# -*- coding: utf-8 -*-
import time
import requests

from lxml import etree
from threading import Thread
from queue import Queue


class Spider(object) :
    def __init__(self) :
        # queue to store level 1 links
        self.all_url_queue = Queue()
        Store the queue for each URL converted to an Element object
        self.html_queue = Queue()

    def get_all_url(self) :
        """ combine each URL that needs to send a network request """
        for i in range(1.20):
            self.all_url_queue.put("http://www.1ppt.com/xiazai/zongjie/ppt_zongjie_{}.html".format(i))

    def get_html(self) :
        """ Send a request to the URL, get a response, and convert it to an Element object.
        while True:
            url = self.all_url_queue.get()
            print("Request:". url) response = requests.get(url) html = etree.HTML(response.content) self.html_queue.put(html) self.all_url_queue.task_done()print("Finished")

    def get_data(self) :
        """ Get the data for each Element object and print the data. ""
        while True:
            html = self.html_queue.get()
            a_list = html.xpath('//ul[@class="tplist"]//h2/a')
            for i in a_list:
                item = {}
                item['title'] = i.xpath('./text()') [0]
                item['url'] = i.xpath('./@href') [0]
                print(item)
            self.html_queue.task_done()

    def run(self) :
        Create a list of all threads created
        thread_list = []
        Collate the level 1 links that need to be climbed
        self.get_all_url()
        Create 5 threads to process data in all_URl_queue and add them to the list of threads
        for i in range(5):
            get_html_thread = Thread(target=self.get_html, daemon=True)
            thread_list.append(get_html_thread)
        Open up three threads to process data in html_queue and add them to the list of threads
        for i in range(3):
            get_data_thread = Thread(target=self.get_data, daemon=True)
            thread_list.append(get_data_thread)
        Start all threads iterated
        for _thread in thread_list:
            _thread.start()
        The main thread cannot exit if the thread blocks the main thread. Therefore, in order to execute the task and exit the main thread safely, a queue should be used to block the main thread
        self.all_url_queue.join()
        self.html_queue.join()
        print("Crawler finished running.")


if __name__ == '__main__':
    # crawler startup time
    start_time = time.time()
    spider = Spider()
    spider.run()
    # crawler end time
    end_time = time.time()
    Total crawler time
    print("Total crawler time", end_time - start_time)
    Total crawler time 1.4571187496185303

Copy the code

5. Multi-process crawler

# -*- coding: utf-8 -*-
import time
import requests

from lxml import etree
from multiprocessing import Process, JoinableQueue


class Spider(object) :
    def __init__(self) :
        # queue to store level 1 links
        self.all_url_queue = JoinableQueue()
        Store response data for each request URL
        self.response_queue = JoinableQueue()

    def get_all_url(self) :
        """ combine each URL that needs to send a network request """
        for i in range(1.20):
            self.all_url_queue.put("http://www.1ppt.com/xiazai/zongjie/ppt_zongjie_{}.html".format(i))

    def get_html(self) :
        """ Send a request to the URL, get a response, and convert it to an Element object.
        while True:
            url = self.all_url_queue.get()
            print("Request:", url)
            response = requests.get(url)
            # html = etree.HTML(response.content)
            self.response_queue.put(response.content)
            self.all_url_queue.task_done()
            print("Finished")

    def get_data(self) :
        """ Get the data for each Element object and print the data. ""
        while True:
            # html = self.html_queue.get()
            html = etree.HTML(self.response_queue.get())
            a_list = html.xpath('//ul[@class="tplist"]//h2/a')
            for i in a_list:
                item = {}
                item['title'] = i.xpath('./text()') [0]
                item['url'] = i.xpath('./@href') [0]
                print(item)
            self.response_queue.task_done()

    def run(self) :
        Create a list of all threads created
        process_list = []
        Collate the level 1 links that need to be climbed
        self.get_all_url()
        Create 5 threads to process data in all_URl_queue and add them to the list of threads
        for i in range(10):
            get_html_thread = Process(target=self.get_html, daemon=True)
            process_list.append(get_html_thread)
        Create 3 threads to process data in response_queue and add them to the list of threads
        for i in range(3):
            get_data_thread = Process(target=self.get_data, daemon=True)
            process_list.append(get_data_thread)
        Start all threads iterated
        for _process in process_list:
            _process.start()
        The main thread cannot exit if the thread blocks the main thread. Therefore, in order to execute the task and exit the main thread safely, a queue should be used to block the main thread
        self.all_url_queue.join()
        self.response_queue.join()
        print("Crawler finished running.")


if __name__ == '__main__':
    # crawler startup time
    start_time = time.time()
    spider = Spider()
    spider.run()
    # crawler end time
    end_time = time.time()
    Total crawler time
    print("Total crawler time", end_time - start_time)
    Crawler total time 4.6992528438568115

Copy the code

conclusion

The article is long, give a big thumbs up to those who see it! Due to the author’s limited level, the article will inevitably have mistakes, welcome friends feedback correction.

If you find this article helpful, please like, comment, and bookmark it

Your support is my biggest motivation!!