The official documentation of scrapy-Redis is relatively concise, and does not mention the operation principle, so if you want to fully understand the operation principle of distributed crawler, or have to see the source code of scrapy-Redis.
Official site: github.com/rolando/scr…
Scrapy-redis is still a library of redis and scrapy, and it doesn’t do much by itself, so it’s like glue that binds the two together. Let’s take a look at what each source code file scrapy-Redis implements and how to implement a distributed crawler system.
Scrapy-Redis
Scrapy-redis provides the following four components:
Four components means that all four modules need to be modified accordingly
- Scheduler
- Duplication Filter
- Item Pipeline
- Base Spider (inherited class)
The following four components are introduced:
1. Smart laundry list
Scrapy transforms Python’s collection.deque into its own Scrapy queue(github.com/scrapy/queu…). (2) To create a Scrapy queue, which is not supported by the crawler itself, we need to replace this queue with a redis database. Storing requests to crawl from the same Redis-server allows multiple spiders to read from the same database.
The Scheduler Scheduler is responsible for adding new requests to the Scrapy queue and removing the next request from the Scrapy queue. It creates a dictionary structure for the queue to climb in order of priority, such as:
{Priority 0: queue 0 Priority 1: queue 1 Priority 2: queue 2}Copy the code
Then, the queue is determined according to the priority of the request, and the queue with the lower priority is dequeued. To manage this relatively advanced queue dictionary, the Scheduler needs to provide a series of methods. But the original Scheduler is no longer available, so scrapy-redis uses the Scheduler component.
2. Duplication Filter(Filter tool) :
Scrapy implements this function by placing the fingerprint of a request into a Scrapy collection, and comparing the fingerprint of the next request to the collection. If the fingerprint is present in the collection, the request was sent, and if it is not, the operation continues. This core weight judgment function is implemented as follows:
def request_seen(self, request):
# self.request_figerprints 就是一个指纹集合
fp = self.request_fingerprint(request)
# 这就是判重的核心操作
if fp in self.fingerprints:
return True
self.fingerprints.add(fp)
if self.file:
self.file.write(fp + os.linesep)
Copy the code
Duplication in scrapy-Redis is realized by the Duplication Filter component, which skillfully realizes Duplication of Duplication Filter through the redis set non-repetition feature. Scrapy-redis the scrapy-Redis scheduler accepts requests from the engine, prints requests to the Redis set to check for duplicates, and pushes non-duplicates to the Redis Request queue.
When the engine requests a request(sent by a Spider), the scheduler pops a request from the Redis Request queue based on its priority back to the engine, which sends the request to the Spider for processing.
3. Item Pipeline:
The engine feeds the crawled Item to the Item Pipeline, and the scrapy-redis Item Pipeline stores the crawled Item in the Redis items queue.
The modified Item Pipeline makes it easy to extract items from the Items queue by key, thus implementing the Items Processes cluster.
4. The Base spiders:
Instead of scrapy, the RedisSpider inherits both Spider and RedisMixin, the class that reads urls from Redis.
When we generate a Spider that inherits the RedisSpider, we call setup_redis, which connects to the Redis database and sets signals:
- One is the signal when the spider is idle, which calls the spider_IDLE function. This function calls schedule_next_request, keeps the spider alive, and throws a DontCloseSpider exception.
- One is the signal when an item is captured, calling the Item_monopoly function, which calls the schedule_next_request function for the next request.
Scrapy-redis framework implementation process summary:
- To summarize the general idea of scrapy-Redis, this component overrides scheduler and spider classes to allow scheduling, spider startup, and Redis to interact.
- The new dupefilter and Queue classes are implemented to realize the interaction between the container and Redis. Because the crawler process on each host accesses the same REDis database, scheduling and scheduling are unified and managed in a unified manner, achieving the purpose of distributed crawler.
- When a spider is initialized, it initializes a corresponding scheduler object, which reads the Settings and configures its own scheduler queue and dupeFilter.
- Each time a spider produces a request, the scrapy engine sends the reuqest to the corresponding scheduler, which visits Redis to evaluate the request. If not, add it to the scheduler queue in Redis. When the scheduling criteria are met, the Scheduler object takes a request from redis’s scheduler queue and sends it to the spider to climb.
- When the spider has climbed all of the temporarily available urls, the scheduler finds that the redis scheduler queue corresponding to the spider is empty and fires a spider_IDLE signal. The spider receives this signal and connects directly to Redis to read the start_urls pool. Take a new batch of URL entries and repeat the process again.
Two, source code analysis
The following source code is annotated basically, I will explain the important code
1.connection.py
This file is used to connect to redis file, used more, is the most important file
import six from scrapy.utils.misc import load_object from . import defaults # Shortcut maps 'setting name' -> 'parmater Name '. # relationship SETTINGS_PARAMS_MAP = {'REDIS_URL': 'URL', 'REDIS_HOST': 'host', 'REDIS_PORT': 'port', 'REDIS_ENCODING': Def get_redis_from_settings(Settings) def get_redis_from_settings(Settings): """Returns a redis client instance from given Scrapy settings object. This function uses ``get_client`` to instantiate the client and uses ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You can override them using the ``REDIS_PARAMS`` setting. Parameters ---------- settings : Settings A scrapy settings object. See the supported settings below. Returns ------- server Redis client instance. Other Parameters ---------------- REDIS_URL : str, optional Server connection URL. REDIS_HOST : str, optional Server host. REDIS_PORT : str, optional Server port. REDIS_ENCODING : str, optional Data encoding. REDIS_PARAMS : Dict, optional Additional Client parameters. """ # shallow copy to prevent changes to params, This will cause the default SETTINGS_PARAMS to be changed params = defaults.redis_params.copy () # update the Settings into params params.update(settings.getdict('REDIS_PARAMS')) # XXX: Deprecate REDIS_* Settings. # Traversing the mapping table to obtain the specified parameter for source, dest in settings_params_map.items (): Val = settings.get(source) params does not update if val if it is not set: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get('redis_cls'), six.string_types): params['redis_cls'] = load_object(params['redis_cls']) return get_redis(**params) # Backwards compatible alias. from_settings = get_redis_from_settings def get_redis(**kwargs): """Returns a redis client instance. Parameters ---------- redis_cls : class, optional Defaults to ``redis.StrictRedis``. url : str, optional If given, ``redis_cls.from_url`` is used to instantiate the class. **kwargs Extra parameters to be passed to the ``redis_cls`` Redis_cls = kwargs.pop('redis_cls', Url = kwargs.pop('url', None) if url: return redis_cls.from_url(url, **kwargs) else: Return redis_cls(**kwargs)Copy the code
Connection provides an important function, from_settings = get_redis_from_settings. This function imports the defualt.py file that defines the fingerprint we accessed. Pipline, Queue, schedule files are all called.
2.defaults.py
It mainly stores the default parameters
Import redis # For standalone use. # delete key DUPEFILTER_KEY = 'dupefilter:%(timestamp)s' # The spiders are PIPELINE_KEY = '%(spider)s:items' #redis StrictRedis # character set encoding REDIS_ENCODING = 'UTF-8' # Sane Connection defaults. # redis REDIS_PARAMS = { 'socket_timeout': 30, 'socket_connect_timeout': 30, 'retry_on_timeout': True, 'encoding': SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # priority queue Is used to specify the way in and out of the queue SCHEDULER_QUEUE_CLASS = 'scrapy_redis. Queue. PriorityQueue' # used to heavy key, SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' # SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis. Dupefilter. RFPDupeFilter' # starting url corresponding key START_URLS_KEY = '% (name) s: start_urls # starting url type START_URLS_AS_SET = FalseCopy the code
3. dupefilter.py
Scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy scrapy
import logging import time from scrapy.dupefilters import BaseDupeFilter from scrapy.utils.request import request_fingerprint from . import defaults from .connection import get_redis_from_settings logger = logging.getLogger(__name__) # TODO: Rename class to RedisDupeFilter. class RFPDupeFilter(BaseDupeFilter): """ Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = logger def __init__(self, server, key, debug=False): "" Initialize the Duplicates filter. parametersparameter ---------- #server: specifies the connection instance of Redis. redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. Debug: bool, optional Whether to log filtered requests. Whether to record filtered requests "" self.server = server self.key = key self.debug = debug self.logdupes = True # the current method passed by the classmethod @classmethod def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : Scrapy.settings. Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance." get_redis_from_settings(settings) # XXX: This creates one-time key. needed to support to use this # class as standalone dupefilter with scrapy's default scheduler # if scrapy passes spider on open() method this wouldn't be needed # TODO: SCRAPY_JOB env as default and fallback to timestamp. # SCRAPY_JOB env as default and fallback to timestamp. Int (time.time())} # use default value Flase debug = settings.getbool('DUPEFILTER_DEBUG') # pass to current class, Return CLS (server, key=key, debug=debug) @classmethod def from_crawler(CLS, crawler): """Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. """ return cls.from_settings(crawler.settings) def request_seen(self, request): """Returns True if request was already seen. Parameters ---------- request : Scrapy.http. Request Returns ------- bool "" #s generate a fingerprint fp = self.request_fingerprint(Request) # This Returns the number of values added, Fingerprint is a collection type #self. server Redis connection instance #self.key is the key to store the fingerprint fp fingerprint #self.key already exists return 0, Sadd (self.key, fp) # If added is 0, the fingerprint exists. False return Added == 0 def request_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """ return request_fingerprint(request) @classmethod def from_spider(cls, spider): settings = spider.settings server = get_redis_from_settings(settings) dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY) key = dupefilter_key % {'spider': Spider. Name} debug= settings.getbool('DUPEFILTER_DEBUG') return CLS (server, key=key, debug=debug) Def close(self, reason=''): """Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason: str, optional """ self.clear() def clear(self): """Clears fingerprints data.""" self.server.delete(self.key) def log(self, request, spider): """Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider """ if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = FalseCopy the code
The request_seen () method is replaced directly with the database storage method, identifying duplicates, and using fingerprints, which again rely on both request_fingerprint() methods. Add the fingerprint directly to the collection. If the fingerprint is added successfully, the fingerprint does not originally exist in the collection. Return 1. The final result of the code is to determine whether the result of the addition is 0. If the result was 1, then the result is false, that is, not repeated, otherwise it is repeated.
4. picklecompat.py
The loads and dumps functions are implemented, essentially implementing a serializer.
Since the Redis database can’t store complex objects (the key part can only be strings, the value part can only be strings, string lists, string collections, and hashes), we need to serialize everything to text first.
This is python’s pickle module, a py2 and Py3 compatible serial chemical. The Serializer is primarily used to store reuqest objects during the scheduler session.
"""A pickle wrapper module with protocol=-1 by default.""" try: import cPickle as pickle # PY2 except ImportError: Def dumps(obj) def dumps(obj) -loads (loads) def dumps(obj) -loads (loads) -loads (loads) return pickle.dumps(obj, protocol=-1)Copy the code
5. piplines.py
Data that is used to process crawlers is serialized into Redis
from scrapy.utils.misc import load_object from scrapy.utils.serialize import ScrapyJSONEncoder from twisted.internet.threads import deferToThread from . import connection, Default_serialize = ScrapyJSONEncoder().encode: """Pushes serialized item into a redis list/queue Settings -------- REDIS_ITEMS_KEY : str Redis key where to store items. REDIS_ITEMS_SERIALIZER : str Object path to serializer function. """ def __init__(self, server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize): """Initialize pipeline. Parameters ---------- server : StrictRedis Redis client instance. key : str Redis key where to store items. serialize_func : Callable Items serializer function. """ self.server = server self.key = key self.serialize = serialize_func # Pass the class itself into the function @classmethod def from_settings(CLS, Settings): #from_settings = get_redis_from_settings # params = {'server': Connection.from_settings (Settings),} # If there is an item_key in the Settings, we use the Settings if settings.get('REDIS_ITEMS_KEY'): Params ['key'] = Settings ['REDIS_ITEMS_KEY'] # if Settings. Get ('REDIS_ITEMS_SERIALIZER'): Params ['serialize_func'] = load_object(Settings ['REDIS_ITEMS_SERIALIZER']) # return CLS (**params) @classmethod def from_crawler(cls, crawler): Cls.from_settings (crawler.settings) # def process_item(self, item, spider) def process_item(self, item, spider) # create a thread to store items, that is, the last item has not been stored. Def process_item(self, item, spider) def process_item(self, item, spider): Key = self.item_key(item, spider) Serialize item as a string data = self.serialize(item) #self.server is a connection instance of redis self.server.rpush(key, Def item_key(self, item, spider) def item_key(self, item, spider): """Returns redis key based on given spider. Override this function to use a different key depending on the item and/or Spider. "" # self.key='%(spider)s:items'=%(spider. Name)s:items' return self.key % {'spider': spider.name} # format stringCopy the code
6.queue.py
Crawl queue, there are three queue implementation, first it implements a parent class base, provide some basic methods and attributes
from scrapy.utils.reqser import request_to_dict, request_from_dict from . import picklecompat class Base(object): """Per-spider base queue class""" def __init__(self, server, spider, key, serializer=None): """Initialize per-spider redis queue. Parameters ---------- server : StrictRedis Redis client instance. spider : Spider Scrapy spider instance. key: str Redis key where to put and get messages. serializer : object Serializer object with ``loads`` and ``dumps`` methods. """ if serializer is None: # Backward compatibility. # TODO: Deprecate pickle. serializer = picklecompat # loads must be carried out in the serialization so that loads must be carried out if not hasattr(serializer, 'loads'): raise TypeError("serializer does not implement 'loads' function: %r" % serializer) if not hasattr(serializer, 'dumps'): raise TypeError("serializer '%s' does not implement 'dumps' function: Self. server = server self.spider = spider self.key = key % {'spider': self.server = server self.spider = key % {'spider': Def _encode_request(self, request) def _encode_request(self, request): Obj = request_to_dict(request, Dumps (obj) def _decode_request(self, def _decode_request) def _decode_request(self, def _decode_request) encoded_request): """Decode an request previously encoded""" obj = self.serializer. Loads (encoded_request) # Convert dict to request objects Return request_from_dict(obj, self.spider) # len must be overloaded otherwise def __len__(self) cannot be used: """Return the length of the queue""" raise NotImplementedError def push(self, request): """Push a request""" raise NotImplementedError def pop(self, timeout=0): """Pop a request""" raise NotImplementedError # raise self.key def clear(self): """Clear queue/stack""" self.server.delete(self.key)Copy the code
First take a look at the encode_request() and decode_request() methods. We store a Reques object in the database, but the database cannot store the object directly, so we need to serialize the request into a string. These two methods can serialize and deserialize the request, respectively, using the pickle library. Queue QUEU will call encode_request () method for serialization when calling push() method to store the request into the database, and call decode_request() method for deserialization when calling POP () to fetch the request