preface

What I want to do today is to climb the air quality of this country, don’t ask why, ask for a project.

Demand analysis

www.tianqihoubao.com/aqi/Crawl all the city information on this site.When YOU click in, it looks like thisAll we have to do is save all of these boxes.

Extraction and analysis

Now that the requirements are clear, the next step is to extract this information. First of all, we need to analyze the home page to get national cities.

Get national cities

Click on element location and you’ll find it in no timeSo we can write our xpath expression at this point to extract this information and let’s first design the information store format for our home page.

So in this case we’re going to define a dictionary that stores data like this.

{" hebei ":[(XXX city,url),(),()], "Jiangxi ":[(),()]}Copy the code

Because we’re going to save it in an Excel spreadsheet, and we’re going to have to do a classification, but this one, we’ll talk about it later, I’m afraid to write it first, because if the requirements change, I’m going to die.

So we’re extracting the names of the provinces

Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
Copy the code

Extract the corresponding city name and link

Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
Copy the code

Extracting quality information

Then there is the question of how to extract the quality of the air. Click on the next link to check it outwww.tianqihoubao.com/aqi/shijiaz… Once again, let’s position itSoon we locate the information, and we expand to see that the rest of the information is hidden under the DIV tagSo we’re still just going to extract.

AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
Copy the code

coding

So the next step is coding. Since it is a small project, the structure of my project is like thisThe Spider here is just a module of the project

Information is stored

In order to facilitate information parsing and storage, I have defined a POJO entity class to better store our data.

class AIRPojo(object) :
    __AQI = None
    __PM25 = None
    __CO = None
    __SO2=None
    __PM10 = None
    __O3 = None
    __NO2 = None
    __City = None
    __Province=None
    __COUNT = 0


    def get_AQI(self) :

        if (self.__AQI):
            return self.__AQI

    def get_PM25(self) :
        if (self.__PM25):
            return self.__PM25

    def get_CO(self) :
        if (self.__CO):
            return self.__CO

    def get_SO2(self) :
        if(self.__SO2):
            return self.__SO2

    def get_PM10(self) :
        if (self.__PM10):
            return self.__PM10

    def get_O3(self) :
        if (self.__O3):
            return self.__O3

    def get_NO2(self) :
        if (self.__NO2):
            return self.__NO2

    def get_City(self) :
        if(self.__City):
            return self.__City

    def get_Province(self) :
        if(self.__Province):
            return self.__Province

    def set_AQI(self, AQI) :
        if(self.__COUNT==0):
            self.__AQI = AQI
            self.__COUNT+=1
    def set_PM25(self, PM25) :
        if(self.__COUNT==1):
            self.__PM25 = PM25
            self.__COUNT+=1
    def set_CO(self, CO) :
        if(self.__COUNT==2):
            self.__CO = CO
            self.__COUNT+=1
    def set_SO2(self,SO2) :
        if(self.__COUNT==3):
            self.__SO2=SO2
            self.__COUNT+=1
    def set_PM10(self, PM10) :
        if(self.__COUNT==4):
            self.__PM10 = PM10
            self.__COUNT+=1
    def set_O3(self, O3) :
        if(self.__COUNT==5):
            self.__O3 = O3
            self.__COUNT+=1
    def set_NO2(self, NO2) :
        if(self.__COUNT==6):
            self.__NO2 = NO2
            self.__COUNT+=1
    def set_City(self,City) :
        if(self.__COUNT==7):
            self.__City = City
            self.__COUNT+=1

    def set_Province(self,Province) :
        if(self.__COUNT==8):
            self.__Province = Province
            self.__COUNT+=1

    def __str__(self) :
        if(self.__COUNT>=8) :return "AQI:"+self.__AQI+"- PM2.5."+self.__PM25+"-CO:"+self.__CO+"-SO2:"+self.__SO2+"-PM10:"+self.__PM10+"-O3:"+self.__O3+"-NO2:"+self.__NO2+"-city:"+self.__City+"-Province"+self.__Province
        else:
            return "Data not saved, cannot output all results."

if __name__ == '__main__':
    air = AIRPojo()
    print(air)


Copy the code

Java is still comfortable to use, but I can’t help liking other people’s coroutines.

Log output

This is a project after all, so there are a few Settings files, but right now we only have oneOf course, since the data source here is determined, so there is no need to write in the configuration, because if you change, I can not parse it.

The crawler written

Home city crawl

First of all, if we want to crawl all the information, then obviously we need that first to analyze the home page, so as to do the whole station crawl ah.

In this case, I’m just going to crawl the home page, so INSTEAD of writing asynchronous requests, I’m going to write requests directly to see how fast the site is accessing. After all, we are not here to call the API, and a site to crawl, so it is bound to be a little slower.

Here it’s pretty easy

    def get_provinces(self) :
        response = requests.get(url=self.rootUrl)
        response_data = response.content.decode(self.encoding)
        html = self.parse.HTML(response_data)
        Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

        for Province in Provinces:
            temp = list()
            Province_citys_link = Province.xpath("""./dd/a/@href""")
            Province_citys_name = Province.xpath("""./dd/a/text()""")
            for city_link,city_name in zip(Province_citys_link,Province_citys_name):
                temp.append((self.baseUrl+city_link,city_name))

            self.all_provinces[Province.xpath("./dt/b/text()") [0]] = temp


Copy the code

That way I save all the information I want. Also found that the site access is really slow crawl, about 2 seconds to give a result, the 279 task is necessarily need to use this way to coroutines, asynchronous, open a thread, use the thread pool, behind the high resource consumption and read and write data security problem, add a lock that directly came, mononuclear directly give you full, performance does not exist.

Asynchronous crawl

So that’s where asynchrony comes in. There are also two parts, a callback function, and an asynchronous request.

 async def fireCity(self,province,url,city,semaphore) :
        # pass four parameters, one is the current province to climb, one is the URL interface to climb, and city is the limit size of that pool

        async with semaphore:
            timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url,timeout=timeout) as response:
                        data = await response.text(encoding=self.encoding)

                        return (data,province,city,url)
            except Exception as e:

                self.timeout_task.append((province,url,city,semaphore))
Copy the code
    def parse_city_quality(self,task) :
        if(task.result()):
            data,province,city,url= task.result()
            html = self.parse.HTML(data)
            airPojo= AIRPojo()

            AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""") [0].strip()

            airPojo.set_AQI(AQI)
            ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

            airPojo.set_PM25(ALL_Info[0].split(":") [1].strip())
            airPojo.set_CO(ALL_Info[1].split(":") [1].strip())
            airPojo.set_SO2(ALL_Info[2].split(":") [1].strip())
            airPojo.set_PM10(ALL_Info[3].split(":") [1].strip())
            airPojo.set_O3(ALL_Info[4].split(":") [1].strip())
            airPojo.set_NO2(ALL_Info[5].split(":") [1].strip())
            airPojo.set_City(city)
            airPojo.set_Province(province)
            self.TIMES+=1 # Here is a record, indicating that we have obtained the weather quality of this city at this time
            if(LOG_LEVEL=="ALL") :print("Currently completed task",self.TIMES,airPojo,url)
        else:
            pass
Copy the code

Timeout problems

Earlier we said that the access is slow, so it must time out, and then the task needs to be reassigned. But theIt does not support dynamic addition of tasks, otherwise we could simply put the timed tasks back in it. So to solve this problem, I created another timeout queue.And when I time out, I put this in this queue. After our first mission to playSee if any of these times out, if any.


    def check(self) :
        while(self.timeout_task):
            if(LOG_LEVEL=="ALL") :print("Handling timeout connection".len(self.timeout_task))
            tasks = []
            while(self.timeout_task):
                province, url, city, semaphore = self.timeout_task.pop(0)
                c = self.fireCity(province, url, city, semaphore)
                task = asyncio.ensure_future(c)
                task.add_done_callback(self.parse_city_quality)
                tasks.append(task)
            self.loop.run_until_complete(asyncio.wait(tasks))

Copy the code

Build the task again. Until it doesn’t, and then this queue of timeout tasks is also what I implement directly using list.

Full crawler code

import time
import aiohttp
import asyncio
import requests
from lxml import etree
import threading
from Spider.pojo.AIRPojo import AIRPojo
from Spider.Setting.Settings import *

class AIR_Quality(object) :
    def __init__(self) :
        self.all_provinces={}
        self.headers = {
        "Referer":"http://www.tianqihoubao.com/"."User-Agent": "Mozilla / 5.0 (Windows NT 10.0; Win64; X64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36"
    }
        self.parse = etree
        self.encoding = "gbk"
        self.baseUrl = "http://www.tianqihoubao.com"
        self.rootUrl = "http://www.tianqihoubao.com/aqi"
        self.TIMES = 0
        self.PoolSize = 500 # Set system upper limit to 500
        self.tasks = []  # task queue
        self.timeout_task = [] # timeout queue
        self.loop = None
        self.TasKLength = 0
    def __now_thread(self) - >bool:
        Check whether the current thread is the master thread
        current_name = threading.current_thread().getName()
        if(current_name=="MainThread") :return True
        return False


    def get_provinces(self) :
        response = requests.get(url=self.rootUrl)
        response_data = response.content.decode(self.encoding)
        html = self.parse.HTML(response_data)
        Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

        for Province in Provinces:
            temp = list()
            Province_citys_link = Province.xpath("""./dd/a/@href""")
            Province_citys_name = Province.xpath("""./dd/a/text()""")
            for city_link,city_name in zip(Province_citys_link,Province_citys_name):
                temp.append((self.baseUrl+city_link,city_name))

            self.all_provinces[Province.xpath("./dt/b/text()") [0]] = temp


    Make an internal section here for data saving
    def parse_city_quality(self,task) :
        if(task.result()):
            data,province,city,url= task.result()
            html = self.parse.HTML(data)
            airPojo= AIRPojo()

            AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""") [0].strip()

            airPojo.set_AQI(AQI)
            ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

            airPojo.set_PM25(ALL_Info[0].split(":") [1].strip())
            airPojo.set_CO(ALL_Info[1].split(":") [1].strip())
            airPojo.set_SO2(ALL_Info[2].split(":") [1].strip())
            airPojo.set_PM10(ALL_Info[3].split(":") [1].strip())
            airPojo.set_O3(ALL_Info[4].split(":") [1].strip())
            airPojo.set_NO2(ALL_Info[5].split(":") [1].strip())
            airPojo.set_City(city)
            airPojo.set_Province(province)
            self.TIMES+=1 # Here is a record, indicating that we have obtained the weather quality of this city at this time
            if(LOG_LEVEL=="ALL") :print("Currently completed task",self.TIMES,airPojo,url)
        else:
            pass
    async def fireCity(self,province,url,city,semaphore) :
        # pass four parameters, one is the current province to climb, one is the URL interface to climb, and city is the limit size of that pool

        async with semaphore:
            timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url,timeout=timeout) as response:
                        data = await response.text(encoding=self.encoding)

                        return (data,province,city,url)
            except Exception as e:

                self.timeout_task.append((province,url,city,semaphore))

    def check(self) :
        while(self.timeout_task):
            if(LOG_LEVEL=="ALL") :print("Handling timeout connection".len(self.timeout_task))
            tasks = []
            while(self.timeout_task):
                province, url, city, semaphore = self.timeout_task.pop(0)
                c = self.fireCity(province, url, city, semaphore)
                task = asyncio.ensure_future(c)
                task.add_done_callback(self.parse_city_quality)
                tasks.append(task)
            self.loop.run_until_complete(asyncio.wait(tasks))

    def run(self) :

        start = time.time()

        if(not self.all_provinces):
            self.get_provinces()

        semaphore = None

        if (self.__now_thread()):
            self.loop = asyncio.get_event_loop()
            semaphore = asyncio.Semaphore(self.PoolSize)
        else:
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
            semaphore = asyncio.Semaphore(self.PoolSize)

        Create asynchronous queue
        for province in self.all_provinces.keys():

            citys_info= self.all_provinces.get(province)
            for city_info in citys_info:
                url_marks,city = city_info
                url = ""
                for url_mark in url_marks.split():
                    url+=url_mark
                c = self.fireCity(province,url,city,semaphore)

                task = asyncio.ensure_future(c)
                task.add_done_callback(self.parse_city_quality)
                self.tasks.append(task)
        self.TasKLength = len(self.tasks)

        self.loop.run_until_complete(asyncio.wait(self.tasks))

        self.check()
        if(LOG_LEVEL=="ALL" or LOG_LEVEL=="NONE") :print("Time:", time.time() - start, "Seconds")
            print("Total tasks :",self.TasKLength)
            print("Quantity completed",self.TIMES,"Remaining timeout tasks:".len(self.timeout_task))
if __name__ == '__main__':
    start = time.time()
    air_quality = AIR_Quality()
    air_quality.get_provinces()
    # print(air_quality.all_provinces)
    air_quality.run()



Copy the code

test

Data ok, so first so it, algorithm blue bridge cup has not how to brush, by the north ~