Only three steps

  1. Firstly, a single crawler is developed, which requires set to deduplicate the URL, and pop and Append of List to realize the task queue
  2. Then change set and List to Redis set and List instead
  3. Finally, use multiple processes or run on multiple servers

The sample code

# -*- coding: utf-8 -*-

# Author: Mulberry ICE
# Email: [email protected]
# Blog: iicey.github.io
# JueJin: juejin.im/user/5c64dce8e51d45013c40742c
import asyncio
import logging
import os

import aiohttp
import aioredis
from aiomultiprocess import Pool
from lxml import etree

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(os.path.split(os.path.basename(__file__))[0])
user = 'root'
password = '123546'
host = '127.0.0.1'
port = 6379


class MoreAuthor:

    def __init__(self, conn, redis):
        self.conn = conn
        self.redis = redis

    async def req_res(self, session, url: str, result_type: str = 'text'):
        if result_type not in {'text'.'bytes'.'json'} :raise TypeError(f"The result_type cannot be {result_type}. Use only: text, bytes, json!")
        if await self.redis.sismember('already_crawl_urls', url):
            logger.debug(F "duplicate URL:{url}")
            return
        else:
            self.redis.sadd('already_crawl_urls', url)
        async with session.get(url, verify_ssl=False) as result:
            if result_type == 'text':
                return await result.text()
            if result_type == 'bytes':
                return await result.read()
            if result_type == 'json':
                return await result.json()

    async def product_and_consumer(self, session, main_url):
        if 'follow' not in main_url:
            main_url += '/follow? condition=0&p=1'
        main_result = await self.req_res(session, main_url)
        if main_result:
            logger.info(f"auth_main: {main_url}")
            author_url_list = await self.parse_more_author(session, main_result)
            for author_url in author_url_list:
                author_url = str(author_url)
                if not await self.redis.sismember('already_crawl_urls', author_url):
                    logger.info(F "added author URl:{author_url}")
                    await self.redis.sadd('already_crawl_urls', author_url)
                    await self.redis.rpush('author_url_queue', author_url)

    async def parse_more_author(self, session, main_result):
        tree = etree.HTML(main_result)
        author_url_list = list(tree.xpath('//*[@class="author-info-title"]/a/@href'))
        page_url_list = tree.xpath('//*[@id="laypage_0"]/a/@href')
        for page_url in page_url_list:
            if page_url[0] = ='/':
                page_url = ' '.join(['https://www.zcool.com.cn', page_url])
            page_result = await self.req_res(session, str(page_url))
            if page_result:
                # logger.info(f"author_page: {page_url}")
                next_author_url_list = await self.parse_more_author(session, page_result)
                author_url_list.extend(next_author_url_list)
        return author_url_list

    async def author_run(self):
        start_url = 'https://morncolour.zcool.com.cn/follow?condition=0&p=1'
        sleep_count = 0
        async with aiohttp.ClientSession() as session:
            if not await self.redis.sismember('already_crawl_urls', start_url):
                await asyncio.create_task(self.product_and_consumer(session, start_url))
            while 1:
                if sleep_count >= 60:
                    break
                if await self.redis.llen('author_url_queue') = =0:
                    sleep_count += 1
                    await asyncio.sleep(1)
                    continue
                sleep_count = 0
                task = asyncio.create_task(self.product_and_consumer
                                           (session, await self.redis.lpop('author_url_queue')))
                await task


async def run(num):
    logger.info(F "this is the first{num}Process tasks")
    conn = await aioredis.create_pool(f'redis://{user}:{password}@{host}:{port}', encoding='utf-8')
    redis = aioredis.Redis(pool_or_conn=conn)
    m = MoreAuthor(conn, redis)
    await m.author_run()
    conn.close()
    await conn.wait_closed()


async def main(a):
    async with Pool() as pool:
        result = await pool.map(run, range(4))
        logger.info(result)


if __name__ == '__main__':
    asyncio.run(main())
Copy the code

The project address

github.com/iicey/zcool