preface
What I want to do today is to climb the air quality of this country, don’t ask why, ask for a project.
Demand analysis
www.tianqihoubao.com/aqi/Crawl all the city information on this site.When YOU click in, it looks like thisAll we have to do is save all of these boxes.
Extraction and analysis
Now that the requirements are clear, the next step is to extract this information. First of all, we need to analyze the home page to get national cities.
Get national cities
Click on element location and you’ll find it in no timeSo we can write our xpath expression at this point to extract this information and let’s first design the information store format for our home page.
So in this case we’re going to define a dictionary that stores data like this.
{" hebei ":[(XXX city,url),(),()], "Jiangxi ":[(),()]}Copy the code
Because we’re going to save it in an Excel spreadsheet, and we’re going to have to do a classification, but this one, we’ll talk about it later, I’m afraid to write it first, because if the requirements change, I’m going to die.
So we’re extracting the names of the provinces
Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
Copy the code
Extract the corresponding city name and link
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
Copy the code
Extracting quality information
Then there is the question of how to extract the quality of the air. Click on the next link to check it outwww.tianqihoubao.com/aqi/shijiaz… Once again, let’s position itSoon we locate the information, and we expand to see that the rest of the information is hidden under the DIV tagSo we’re still just going to extract.
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
Copy the code
coding
So the next step is coding. Since it is a small project, the structure of my project is like thisThe Spider here is just a module of the project
Information is stored
In order to facilitate information parsing and storage, I have defined a POJO entity class to better store our data.
class AIRPojo(object) :
__AQI = None
__PM25 = None
__CO = None
__SO2=None
__PM10 = None
__O3 = None
__NO2 = None
__City = None
__Province=None
__COUNT = 0
def get_AQI(self) :
if (self.__AQI):
return self.__AQI
def get_PM25(self) :
if (self.__PM25):
return self.__PM25
def get_CO(self) :
if (self.__CO):
return self.__CO
def get_SO2(self) :
if(self.__SO2):
return self.__SO2
def get_PM10(self) :
if (self.__PM10):
return self.__PM10
def get_O3(self) :
if (self.__O3):
return self.__O3
def get_NO2(self) :
if (self.__NO2):
return self.__NO2
def get_City(self) :
if(self.__City):
return self.__City
def get_Province(self) :
if(self.__Province):
return self.__Province
def set_AQI(self, AQI) :
if(self.__COUNT==0):
self.__AQI = AQI
self.__COUNT+=1
def set_PM25(self, PM25) :
if(self.__COUNT==1):
self.__PM25 = PM25
self.__COUNT+=1
def set_CO(self, CO) :
if(self.__COUNT==2):
self.__CO = CO
self.__COUNT+=1
def set_SO2(self,SO2) :
if(self.__COUNT==3):
self.__SO2=SO2
self.__COUNT+=1
def set_PM10(self, PM10) :
if(self.__COUNT==4):
self.__PM10 = PM10
self.__COUNT+=1
def set_O3(self, O3) :
if(self.__COUNT==5):
self.__O3 = O3
self.__COUNT+=1
def set_NO2(self, NO2) :
if(self.__COUNT==6):
self.__NO2 = NO2
self.__COUNT+=1
def set_City(self,City) :
if(self.__COUNT==7):
self.__City = City
self.__COUNT+=1
def set_Province(self,Province) :
if(self.__COUNT==8):
self.__Province = Province
self.__COUNT+=1
def __str__(self) :
if(self.__COUNT>=8) :return "AQI:"+self.__AQI+"- PM2.5."+self.__PM25+"-CO:"+self.__CO+"-SO2:"+self.__SO2+"-PM10:"+self.__PM10+"-O3:"+self.__O3+"-NO2:"+self.__NO2+"-city:"+self.__City+"-Province"+self.__Province
else:
return "Data not saved, cannot output all results."
if __name__ == '__main__':
air = AIRPojo()
print(air)
Copy the code
Java is still comfortable to use, but I can’t help liking other people’s coroutines.
Log output
This is a project after all, so there are a few Settings files, but right now we only have oneOf course, since the data source here is determined, so there is no need to write in the configuration, because if you change, I can not parse it.
The crawler written
Home city crawl
First of all, if we want to crawl all the information, then obviously we need that first to analyze the home page, so as to do the whole station crawl ah.
In this case, I’m just going to crawl the home page, so INSTEAD of writing asynchronous requests, I’m going to write requests directly to see how fast the site is accessing. After all, we are not here to call the API, and a site to crawl, so it is bound to be a little slower.
Here it’s pretty easy
def get_provinces(self) :
response = requests.get(url=self.rootUrl)
response_data = response.content.decode(self.encoding)
html = self.parse.HTML(response_data)
Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
for Province in Provinces:
temp = list()
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
for city_link,city_name in zip(Province_citys_link,Province_citys_name):
temp.append((self.baseUrl+city_link,city_name))
self.all_provinces[Province.xpath("./dt/b/text()") [0]] = temp
Copy the code
That way I save all the information I want. Also found that the site access is really slow crawl, about 2 seconds to give a result, the 279 task is necessarily need to use this way to coroutines, asynchronous, open a thread, use the thread pool, behind the high resource consumption and read and write data security problem, add a lock that directly came, mononuclear directly give you full, performance does not exist.
Asynchronous crawl
So that’s where asynchrony comes in. There are also two parts, a callback function, and an asynchronous request.
async def fireCity(self,province,url,city,semaphore) :
# pass four parameters, one is the current province to climb, one is the URL interface to climb, and city is the limit size of that pool
async with semaphore:
timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url,timeout=timeout) as response:
data = await response.text(encoding=self.encoding)
return (data,province,city,url)
except Exception as e:
self.timeout_task.append((province,url,city,semaphore))
Copy the code
def parse_city_quality(self,task) :
if(task.result()):
data,province,city,url= task.result()
html = self.parse.HTML(data)
airPojo= AIRPojo()
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""") [0].strip()
airPojo.set_AQI(AQI)
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
airPojo.set_PM25(ALL_Info[0].split(":") [1].strip())
airPojo.set_CO(ALL_Info[1].split(":") [1].strip())
airPojo.set_SO2(ALL_Info[2].split(":") [1].strip())
airPojo.set_PM10(ALL_Info[3].split(":") [1].strip())
airPojo.set_O3(ALL_Info[4].split(":") [1].strip())
airPojo.set_NO2(ALL_Info[5].split(":") [1].strip())
airPojo.set_City(city)
airPojo.set_Province(province)
self.TIMES+=1 # Here is a record, indicating that we have obtained the weather quality of this city at this time
if(LOG_LEVEL=="ALL") :print("Currently completed task",self.TIMES,airPojo,url)
else:
pass
Copy the code
Timeout problems
Earlier we said that the access is slow, so it must time out, and then the task needs to be reassigned. But theIt does not support dynamic addition of tasks, otherwise we could simply put the timed tasks back in it. So to solve this problem, I created another timeout queue.And when I time out, I put this in this queue. After our first mission to playSee if any of these times out, if any.
def check(self) :
while(self.timeout_task):
if(LOG_LEVEL=="ALL") :print("Handling timeout connection".len(self.timeout_task))
tasks = []
while(self.timeout_task):
province, url, city, semaphore = self.timeout_task.pop(0)
c = self.fireCity(province, url, city, semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
tasks.append(task)
self.loop.run_until_complete(asyncio.wait(tasks))
Copy the code
Build the task again. Until it doesn’t, and then this queue of timeout tasks is also what I implement directly using list.
Full crawler code
import time
import aiohttp
import asyncio
import requests
from lxml import etree
import threading
from Spider.pojo.AIRPojo import AIRPojo
from Spider.Setting.Settings import *
class AIR_Quality(object) :
def __init__(self) :
self.all_provinces={}
self.headers = {
"Referer":"http://www.tianqihoubao.com/"."User-Agent": "Mozilla / 5.0 (Windows NT 10.0; Win64; X64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36"
}
self.parse = etree
self.encoding = "gbk"
self.baseUrl = "http://www.tianqihoubao.com"
self.rootUrl = "http://www.tianqihoubao.com/aqi"
self.TIMES = 0
self.PoolSize = 500 # Set system upper limit to 500
self.tasks = [] # task queue
self.timeout_task = [] # timeout queue
self.loop = None
self.TasKLength = 0
def __now_thread(self) - >bool:
Check whether the current thread is the master thread
current_name = threading.current_thread().getName()
if(current_name=="MainThread") :return True
return False
def get_provinces(self) :
response = requests.get(url=self.rootUrl)
response_data = response.content.decode(self.encoding)
html = self.parse.HTML(response_data)
Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
for Province in Provinces:
temp = list()
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
for city_link,city_name in zip(Province_citys_link,Province_citys_name):
temp.append((self.baseUrl+city_link,city_name))
self.all_provinces[Province.xpath("./dt/b/text()") [0]] = temp
Make an internal section here for data saving
def parse_city_quality(self,task) :
if(task.result()):
data,province,city,url= task.result()
html = self.parse.HTML(data)
airPojo= AIRPojo()
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""") [0].strip()
airPojo.set_AQI(AQI)
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
airPojo.set_PM25(ALL_Info[0].split(":") [1].strip())
airPojo.set_CO(ALL_Info[1].split(":") [1].strip())
airPojo.set_SO2(ALL_Info[2].split(":") [1].strip())
airPojo.set_PM10(ALL_Info[3].split(":") [1].strip())
airPojo.set_O3(ALL_Info[4].split(":") [1].strip())
airPojo.set_NO2(ALL_Info[5].split(":") [1].strip())
airPojo.set_City(city)
airPojo.set_Province(province)
self.TIMES+=1 # Here is a record, indicating that we have obtained the weather quality of this city at this time
if(LOG_LEVEL=="ALL") :print("Currently completed task",self.TIMES,airPojo,url)
else:
pass
async def fireCity(self,province,url,city,semaphore) :
# pass four parameters, one is the current province to climb, one is the URL interface to climb, and city is the limit size of that pool
async with semaphore:
timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url,timeout=timeout) as response:
data = await response.text(encoding=self.encoding)
return (data,province,city,url)
except Exception as e:
self.timeout_task.append((province,url,city,semaphore))
def check(self) :
while(self.timeout_task):
if(LOG_LEVEL=="ALL") :print("Handling timeout connection".len(self.timeout_task))
tasks = []
while(self.timeout_task):
province, url, city, semaphore = self.timeout_task.pop(0)
c = self.fireCity(province, url, city, semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
tasks.append(task)
self.loop.run_until_complete(asyncio.wait(tasks))
def run(self) :
start = time.time()
if(not self.all_provinces):
self.get_provinces()
semaphore = None
if (self.__now_thread()):
self.loop = asyncio.get_event_loop()
semaphore = asyncio.Semaphore(self.PoolSize)
else:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
semaphore = asyncio.Semaphore(self.PoolSize)
Create asynchronous queue
for province in self.all_provinces.keys():
citys_info= self.all_provinces.get(province)
for city_info in citys_info:
url_marks,city = city_info
url = ""
for url_mark in url_marks.split():
url+=url_mark
c = self.fireCity(province,url,city,semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
self.tasks.append(task)
self.TasKLength = len(self.tasks)
self.loop.run_until_complete(asyncio.wait(self.tasks))
self.check()
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="NONE") :print("Time:", time.time() - start, "Seconds")
print("Total tasks :",self.TasKLength)
print("Quantity completed",self.TIMES,"Remaining timeout tasks:".len(self.timeout_task))
if __name__ == '__main__':
start = time.time()
air_quality = AIR_Quality()
air_quality.get_provinces()
# print(air_quality.all_provinces)
air_quality.run()
Copy the code
test
Data ok, so first so it, algorithm blue bridge cup has not how to brush, by the north ~