Only three steps
- Firstly, a single crawler is developed, which requires set to deduplicate the URL, and pop and Append of List to realize the task queue
- Then change set and List to Redis set and List instead
- Finally, use multiple processes or run on multiple servers
The sample code
# -*- coding: utf-8 -*-
# Author: Mulberry ICE
# Email: [email protected]
# Blog: iicey.github.io
# JueJin: juejin.im/user/5c64dce8e51d45013c40742c
import asyncio
import logging
import os
import aiohttp
import aioredis
from aiomultiprocess import Pool
from lxml import etree
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(os.path.split(os.path.basename(__file__))[0])
user = 'root'
password = '123546'
host = '127.0.0.1'
port = 6379
class MoreAuthor:
def __init__(self, conn, redis):
self.conn = conn
self.redis = redis
async def req_res(self, session, url: str, result_type: str = 'text'):
if result_type not in {'text'.'bytes'.'json'} :raise TypeError(f"The result_type cannot be {result_type}. Use only: text, bytes, json!")
if await self.redis.sismember('already_crawl_urls', url):
logger.debug(F "duplicate URL:{url}")
return
else:
self.redis.sadd('already_crawl_urls', url)
async with session.get(url, verify_ssl=False) as result:
if result_type == 'text':
return await result.text()
if result_type == 'bytes':
return await result.read()
if result_type == 'json':
return await result.json()
async def product_and_consumer(self, session, main_url):
if 'follow' not in main_url:
main_url += '/follow? condition=0&p=1'
main_result = await self.req_res(session, main_url)
if main_result:
logger.info(f"auth_main: {main_url}")
author_url_list = await self.parse_more_author(session, main_result)
for author_url in author_url_list:
author_url = str(author_url)
if not await self.redis.sismember('already_crawl_urls', author_url):
logger.info(F "added author URl:{author_url}")
await self.redis.sadd('already_crawl_urls', author_url)
await self.redis.rpush('author_url_queue', author_url)
async def parse_more_author(self, session, main_result):
tree = etree.HTML(main_result)
author_url_list = list(tree.xpath('//*[@class="author-info-title"]/a/@href'))
page_url_list = tree.xpath('//*[@id="laypage_0"]/a/@href')
for page_url in page_url_list:
if page_url[0] = ='/':
page_url = ' '.join(['https://www.zcool.com.cn', page_url])
page_result = await self.req_res(session, str(page_url))
if page_result:
# logger.info(f"author_page: {page_url}")
next_author_url_list = await self.parse_more_author(session, page_result)
author_url_list.extend(next_author_url_list)
return author_url_list
async def author_run(self):
start_url = 'https://morncolour.zcool.com.cn/follow?condition=0&p=1'
sleep_count = 0
async with aiohttp.ClientSession() as session:
if not await self.redis.sismember('already_crawl_urls', start_url):
await asyncio.create_task(self.product_and_consumer(session, start_url))
while 1:
if sleep_count >= 60:
break
if await self.redis.llen('author_url_queue') = =0:
sleep_count += 1
await asyncio.sleep(1)
continue
sleep_count = 0
task = asyncio.create_task(self.product_and_consumer
(session, await self.redis.lpop('author_url_queue')))
await task
async def run(num):
logger.info(F "this is the first{num}Process tasks")
conn = await aioredis.create_pool(f'redis://{user}:{password}@{host}:{port}', encoding='utf-8')
redis = aioredis.Redis(pool_or_conn=conn)
m = MoreAuthor(conn, redis)
await m.author_run()
conn.close()
await conn.wait_closed()
async def main(a):
async with Pool() as pool:
result = await pool.map(run, range(4))
logger.info(result)
if __name__ == '__main__':
asyncio.run(main())
Copy the code
The project address
github.com/iicey/zcool