1. Use of Queue message queues
This is the 8th day of my participation in Gwen Challenge
1.1 Functions of Queue
1.Python's Queue module provides an implementation of FIFO for multithreaded programming. 2. Complete data and message transfer between multiple threads. Multiple threads can share the same Queue instance. 3. The size of the Queue (number of elements) can be used to limit memory usage 4. Transfer of data and messages between multiple processesCopy the code
1.2 Use of Queue
1.2.1 Importing Queue
from queue import Queue # applies to threads and coroutines
from multiprocessing import JoinableQueue as Queue # used for processes
Copy the code
2.2Queue
q = Queue(maxsize=100) # maxsize specifies the queue length
item = {}
q.put_nowait(item) [error] queue full [error
q.put(item) Block waiting when queue is full
q.get_nowait() [error] queue empty
q.get() # fetch data, block wait when queue is empty
q.qsize() Get the number of existing data in the queue
q.join() The main thread is blocked when the count is not 0, and the queue continues when the count is 0
# q.jin () blocks the main thread, used in conjunction with task_done()
# put() makes the count +1, task_done() makes the count -1
# count 0 to stop blocking and let the main thread continue
q.task_done() Task_done = task_done = task_done = task_done
Copy the code
1.3 Benefits of Using Queue
Before, when learning threads, multithreading would have competition for resources. In order to solve the competition problem, I used locking and thread synchronization, which solved the problem but reduced efficiency. Message queues can be used to improve efficiency and resolve contention.
2. Producers and consumers
In software development, it is common to encounter scenarios where some modules are responsible for producing data that is processed by other modules (modules here could be functions, threads, processes, etc.). The module that produces the data is called a producer, and the module that processes the data is called a consumer
Advantages of the producer-consumer model:
- Uncoupling (when two tasks are not directly related and you only do your own thing, regardless of others)
- Concurrency improves efficiency
Code examples:
# -*- coding: utf-8 -*-
import time
from queue import Queue
from threading import Thread
data_queue = Queue()
# producers
def write_data() :
for i in range(10):
data_queue.put(i)
print("The producer produced the data as:", i)
time.sleep(0.5)
print("The producer is finished.")
# consumers
def read_data() :
while True:
if data_queue.qsize():
data = data_queue.get()
print("Consumer spending data is:", data)
time.sleep(1)
data_queue.task_done()
else:
break
if __name__ == '__main__':
write_thread = Thread(target=write_data, daemon=True)
read_thread = Thread(target=read_data, daemon=True)
write_thread.start()
read_thread.start()
data_queue.join()
print("End of program")
Copy the code
3. Single-threaded crawlers
# -*- coding: utf-8 -*-
import time
import requests
from lxml import etree
class Spider(object) :
def __init__(self) :
Store a list of level 1 links
self.all_url_list = []
Store a list of each URL converted to Element objects
self.html_list = []
def get_all_url(self) :
""" combine each URL that needs to send a network request """
for i in range(1.20):
self.all_url_list.append("http://www.1ppt.com/xiazai/zongjie/ppt_zongjie_{}.html".format(i))
def get_html(self, url) :
""" Send a request to the URL, get a response, and convert it to an Element object.
print("Request:", url)
response = requests.get(url)
html = etree.HTML(response.content)
self.html_list.append(html)
print("Finished")
def get_data(self, html) :
""" Get the data for each Element object and print the data. ""
a_list = html.xpath('//ul[@class="tplist"]//h2/a')
for i in a_list:
item = {}
item['title'] = i.xpath('./text()') [0]
item['url'] = i.xpath('./@href') [0]
print(item)
def run(self) :
Collate the level 1 links that need to be climbed
self.get_all_url()
Iterating through urls, sending requests to each URL for data, and converting the resulting data into an Element object to add to html_list
for i in self.all_url_list:
self.get_html(i)
Iterate through the HTML to get the data we need from each Element object
for html in self.html_list:
self.get_data(html)
if __name__ == '__main__':
# crawler startup time
start_time = time.time()
spider = Spider()
spider.run()
# crawler end time
end_time = time.time()
Total crawler time
print("Total crawler time", end_time - start_time)
Total time of crawler 32.52687358856201
Copy the code
4. Multi-threaded crawlers
# -*- coding: utf-8 -*-
import time
import requests
from lxml import etree
from threading import Thread
from queue import Queue
class Spider(object) :
def __init__(self) :
# queue to store level 1 links
self.all_url_queue = Queue()
Store the queue for each URL converted to an Element object
self.html_queue = Queue()
def get_all_url(self) :
""" combine each URL that needs to send a network request """
for i in range(1.20):
self.all_url_queue.put("http://www.1ppt.com/xiazai/zongjie/ppt_zongjie_{}.html".format(i))
def get_html(self) :
""" Send a request to the URL, get a response, and convert it to an Element object.
while True:
url = self.all_url_queue.get()
print("Request:". url) response = requests.get(url) html = etree.HTML(response.content) self.html_queue.put(html) self.all_url_queue.task_done()print("Finished")
def get_data(self) :
""" Get the data for each Element object and print the data. ""
while True:
html = self.html_queue.get()
a_list = html.xpath('//ul[@class="tplist"]//h2/a')
for i in a_list:
item = {}
item['title'] = i.xpath('./text()') [0]
item['url'] = i.xpath('./@href') [0]
print(item)
self.html_queue.task_done()
def run(self) :
Create a list of all threads created
thread_list = []
Collate the level 1 links that need to be climbed
self.get_all_url()
Create 5 threads to process data in all_URl_queue and add them to the list of threads
for i in range(5):
get_html_thread = Thread(target=self.get_html, daemon=True)
thread_list.append(get_html_thread)
Open up three threads to process data in html_queue and add them to the list of threads
for i in range(3):
get_data_thread = Thread(target=self.get_data, daemon=True)
thread_list.append(get_data_thread)
Start all threads iterated
for _thread in thread_list:
_thread.start()
The main thread cannot exit if the thread blocks the main thread. Therefore, in order to execute the task and exit the main thread safely, a queue should be used to block the main thread
self.all_url_queue.join()
self.html_queue.join()
print("Crawler finished running.")
if __name__ == '__main__':
# crawler startup time
start_time = time.time()
spider = Spider()
spider.run()
# crawler end time
end_time = time.time()
Total crawler time
print("Total crawler time", end_time - start_time)
Total crawler time 1.4571187496185303
Copy the code
5. Multi-process crawler
# -*- coding: utf-8 -*-
import time
import requests
from lxml import etree
from multiprocessing import Process, JoinableQueue
class Spider(object) :
def __init__(self) :
# queue to store level 1 links
self.all_url_queue = JoinableQueue()
Store response data for each request URL
self.response_queue = JoinableQueue()
def get_all_url(self) :
""" combine each URL that needs to send a network request """
for i in range(1.20):
self.all_url_queue.put("http://www.1ppt.com/xiazai/zongjie/ppt_zongjie_{}.html".format(i))
def get_html(self) :
""" Send a request to the URL, get a response, and convert it to an Element object.
while True:
url = self.all_url_queue.get()
print("Request:", url)
response = requests.get(url)
# html = etree.HTML(response.content)
self.response_queue.put(response.content)
self.all_url_queue.task_done()
print("Finished")
def get_data(self) :
""" Get the data for each Element object and print the data. ""
while True:
# html = self.html_queue.get()
html = etree.HTML(self.response_queue.get())
a_list = html.xpath('//ul[@class="tplist"]//h2/a')
for i in a_list:
item = {}
item['title'] = i.xpath('./text()') [0]
item['url'] = i.xpath('./@href') [0]
print(item)
self.response_queue.task_done()
def run(self) :
Create a list of all threads created
process_list = []
Collate the level 1 links that need to be climbed
self.get_all_url()
Create 5 threads to process data in all_URl_queue and add them to the list of threads
for i in range(10):
get_html_thread = Process(target=self.get_html, daemon=True)
process_list.append(get_html_thread)
Create 3 threads to process data in response_queue and add them to the list of threads
for i in range(3):
get_data_thread = Process(target=self.get_data, daemon=True)
process_list.append(get_data_thread)
Start all threads iterated
for _process in process_list:
_process.start()
The main thread cannot exit if the thread blocks the main thread. Therefore, in order to execute the task and exit the main thread safely, a queue should be used to block the main thread
self.all_url_queue.join()
self.response_queue.join()
print("Crawler finished running.")
if __name__ == '__main__':
# crawler startup time
start_time = time.time()
spider = Spider()
spider.run()
# crawler end time
end_time = time.time()
Total crawler time
print("Total crawler time", end_time - start_time)
Crawler total time 4.6992528438568115
Copy the code
conclusion
The article is long, give a big thumbs up to those who see it! Due to the author’s limited level, the article will inevitably have mistakes, welcome friends feedback correction.
If you find this article helpful, please like, comment, and bookmark it
Your support is my biggest motivation!!