What are the core components of Scrapy? We’ve looked at what Scrapy core components do and what they do when they’re initialized.

In this article, we’ll take a look at the core of Scrapy’s scraping process, which schedules components to complete the scraping task.

Run the entrance

Back to the original entry, take a look at Scrapy source code: How does Scrapy work? Execute Scrapy. Execute Scrapy. Execute Scrapy.

  • callcmdline.py 的 executemethods
  • Find the correspondingCommand instancesParsing command line
  • buildCrawlerProcessInstance, callcrawl 和 startMethod start grab

The crawl method, which eventually calls the Crawl of the Cralwer instance, gives control to Engine, while the start method registers the coroutine pool and starts scheduling execution asynchronously.

Let’s look at Cralwer’s crawl method:

@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
    assert not self.crawling, "Crawling already taking place"
    self.crawling = True
    try:
        Create a crawler instance
        self.spider = self._create_spider(*args, **kwargs)
        # create engine
        self.engine = self._create_engine()
        Call spider's start_requests for the seed URL
        start_requests = iter(self.spider.start_requests())
        Call engine's open_spider to the engine's schedule
        yield self.engine.open_spider(self.spider, start_requests)
        yield defer.maybeDeferred(self.engine.start)
    except Exception:
        if six.PY2:
            exc_info = sys.exc_info()
        self.crawling = False
        if self.engine is not None:
            yield self.engine.close()
        if six.PY2:
            six.reraise(*exc_info)
        raise
Copy the code

Spiders /__init__. Py spiders/__init__. Py creates spider instances, engine requests, and spider start_requests.

def start_requests(self) :Generate a seed URL object from the defined start_URLS property
    for url in self.start_urls:
        yield self.make_requests_from_url(url)

def make_requests_from_url(self, url):
    Construct the Request object
    return Request(url, dont_filter=True)
Copy the code

Build request

As you can see from the above code, the start_urls property that we normally have to define is used to build a Request here.

class Request(object_ref) :def __init__(self.url.callback=None.method= 'GET', headers=None.body=None.cookies=None.meta=None.encoding= 'utf8 ',priority= 0,dont_filter=False.errback=None) : # codesself._encoding = encoding# request methodself.method = str(method).upper(#) Settingsurl
        self._set_url(url) # setbody
        self._set_body(body)
        assert isinstance(priority.int),"Request priority not an integer: %r"%priority# priorityself.priority = priority
        assert callback or not errback,"Cannot use errback without a callback"# callback functionself.callback = callback# exception callback functionself.errback = errback
        # cookies
        self.cookies = cookies or {}
        # to build the Header
        self.headers = Headers(headers or {}, encoding=encoding)
        # Whether to filter
        self.dont_filter = dont_filter
  # Additional information
        self._meta = dict(meta) if meta else None
Copy the code

The Request object is simple, encapsulating Request parameters, Request methods, callbacks, and attribute information that can be attached.

Of course, you can override start_requests and make_requests_from_URL in subclasses to build the seed request with custom logic.

Engine scheduling

Returning to the crawl method, engine’s open_spider is called after the seed request object is built:

@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):
    assert self.has_capacity(), "No free spider slot when opening %r" % \
        spider.name
    logger.info("Spider opened", extra={'spider': spider})
    # register _next_request scheduling method for loop scheduling
    nextcall = CallLaterOnce(self._next_request, spider)
    Initialize the scheduler
    scheduler = self.scheduler_cls.from_crawler(self.crawler)
    Call crawler middleware to handle seed request
    start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
    # Encapsulate Slot objects
    slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
    self.slot = slot
    self.spider = spider
    Call Scheduler's open
    yield scheduler.open(spider)
    # call scrapyer's open
    yield self.scraper.open_spider(spider)
    Call stats open
    self.crawler.stats.open_spider(spider)
    yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
    # start scheduling
    slot.nextcall.schedule()
    slot.heartbeat.start(5)
Copy the code

Here we first build a CallLaterOnce and then register the _next_request method.

class CallLaterOnce(object) : # intwistedthereactorLoops to schedule a method indef __init__(self.func, *a, * *kw) :self._func = func
        self._a = a
        self._kw = kw
        self._call = None

    def schedule(self.delay=0): =0if self._call is None: # registeredselftocallLaterIn theself._call = reactor.callLater(delay.self)

    def cancel(self) :if self._call:
            self._call.cancel(a)def __call__(self): # The registration above isselfSo it will execute__call__
        self._call = None
        return self._func(*self._a, * *self._kwCopy the code

This encapsulates the looped method class, and the registered method is executed asynchronously in the Twisted Reactor, which then registers self into the reactor’s callLater by calling Schedule, which then executes the __call__ method. The final implementation is our registered method.

The method registered here is the engine’s _next_request, that is, the method is scheduled in a loop until the program exits.

After calling the crawler middleware’s process_start_requests method, you can define multiple crawler middleware, override this method for each class, and the crawler will call your crawler middleware separately to handle initialization requests prior to scheduling, and you can filter, process, filter, and do whatever logic you want.

This has the advantage of splitting the desired logic into multiple middleware pieces, each of which is functionally independent and can be maintained more cleanly.

The scheduler

Scheduler’s open is called to start scheduling tasks:

def open(self, spider):
    self.spider = spider
    # instantiate the priority queue
    self.mqs = self.pqclass(self._newmq)
    Instantiate disk-based queues if dQDIR is defined
    self.dqs = self._dq() if self.dqdir else None
    Call the open method that requests the fingerprint filter
    return self.df.open()
    
def _dq(self) :Instantiate the disk queue
    activef = join(self.dqdir, 'active.json')
    if exists(activef):
        with open(activef) as f:
            prios = json.load(f)
    else:
        prios = ()
    q = self.pqclass(self._newdq, startprios=prios)
    if q:
        logger.info("Resuming crawl (%(queuesize)d requests scheduled)",
                    {'queuesize': len(q)}, extra={'spider'self.spider})
    return q
Copy the code

In the open method, the scheduler instantiates the priority queue and decides whether to use the disk queue based on whether dQDIR is configured. Finally, the open method that requests the fingerprint filter is defined in the BaseDupeFilter parent class:

class BaseDupeFilter(object): # filter base class, subclass can override the following method @classmethod
    def from_settings(cls.settings) :return cls(a)def request_seen(self.requestRequest filteringreturn False

    def open(selfThe initialization of the filter can be overriddenpass

    def close(self.reason): # rewritable to complete the job of turning off filterspass

    def log(self.request.spider) :pas
Copy the code

Scrapy implements the RFPDupeFilter (RFPDupeFilter) and Scrapy implements the RFPDupeFilter to filter repeated requests.

Scraper

I then call the open_spider method of the Scraper. As mentioned in the previous article, the Scraper class is the bridge between the Engine, Spider, and Item Pipeline components:

@defer.inlineCallbacks
def open_spider(self, spider):
    self.slot = Slot()
    # Call open_spider on all pipelines
    yield self.itemproc.open_spider(spider)
Copy the code

The main logic here is the Scraper to invoke the open_spider methods on all pipelines. If we define multiple Pipeline output classes, we can override the Open_spider to initialize each Pipeline before output.

Cyclic scheduling

After calling a series of components’ open methods, nextCall.schedule () is finally called to start scheduling, which loops through the _next_request method registered above:

def _next_request(self, spider):
    # this method will schedule in a loop
    slot = self.slot
    if not slot:
        return
    # suspended
    if self.paused:
        return
    # Wait or not
    while not self._needs_backout(spider):
        Fetch request from scheduler
        # notice that the first fetch does not break
        # to perform the following logic
        if not self._next_request_from_scheduler(spider):
            break
    # if start_requests has data and no need to wait
    if slot.start_requests and not self._needs_backout(spider):
        try:
            Get the next seed request
            request = next(slot.start_requests)
        except StopIteration:
            slot.start_requests = None
        except Exception:
            slot.start_requests = None
            logger.error('Error while obtaining start requests',
                         exc_info=True, extra={'spider': spider})
        else:
            Call the crawl, which actually places the request on the scheduler queue
            self.crawl(request, spider)
    Disable spider when idle
    if self.spider_is_idle(spider) and slot.close_if_idle:
        self._spider_idle(spider)
        
def _needs_backout(self, spider):
    There are four criteria for waiting
    # 1. Whether to stop the Engine
    # 2. Whether slot is close
    # 3. Downloader downloads more than preset
    Response exceeds the default
    slot = self.slot
    return not self.running \
        or slot.closing \
        or self.downloader.needs_backout() \
        or self.scraper.slot.needs_backout()

def _next_request_from_scheduler(self, spider):
    slot = self.slot
    Scheduler removes the next request from the scheduler
    request = slot.scheduler.next_request()
    if not request:
        return
    # download
    d = self._download(request, spider)
    Register success, fail, exit callback method
    d.addBoth(self._handle_downloader_output, request, spider)
    d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    d.addBoth(lambda _: slot.remove_request(request))
    d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    d.addBoth(lambda _: slot.nextcall.schedule())
    d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    return d
    

def crawl(self, request, spider):
    assert spider in self.open_spiders, \
        "Spider %r not opened when crawling: %s" % (spider.name, request)
    The request is placed in the scheduler queue and the schedule of nextCall is called
    self.schedule(request, spider)
    self.slot.nextcall.schedule()

def schedule(self, request, spider):
    self.signals.send_catch_log(signal=signals.request_scheduled,
            request=request, spider=spider)
    Call scheduler's enqueue_request and place the request on the scheduler queue
    if not self.slot.scheduler.enqueue_request(request):
        self.signals.send_catch_log(signal=signals.request_dropped,
                                    request=request, spider=spider)
Copy the code

The _next_REQUEST method first calls _needs_backout to check whether it needs to wait, if any of the following conditions exist:

  • Whether the engine is actively shut down
  • Slot closed or not
  • Whether the downloader exceeds the preset parameters when downloading from the network
  • Scraper processes whether the output exceeds the preset parameter

If there is no waiting, the _next_request_FROm_scheduler method is called, which, as its name suggests, mainly fetches the Request from Schduler.

Note that the first time this method is called, no Request is placed in the Scheduler. Instead, it breaks off to perform the following logic, and the crawl method is called to place the Request on the Scheduler’s queue. The queuing process is checked by the request filter for duplication.

The next time the _next_request_FROm_scheduler is called, the download request is fetched from the Scheduler and the download action is executed.

To start with the first dispatch, perform the crawl:

def crawl(self, request, spider):
    assert spider in self.open_spiders, \
        "Spider %r not opened when crawling: %s" % (spider.name, request)
    Place the Scheduler queue
    self.schedule(request, spider)
    # Perform the next dispatch
    self.slot.nextcall.schedule()
    
def schedule(self, request, spider):
    self.signals.send_catch_log(signal=signals.request_scheduled,
            request=request, spider=spider)
    Place the Scheduler queue
    if not self.slot.scheduler.enqueue_request(request):
        self.signals.send_catch_log(signal=signals.request_dropped,
                                    request=request, spider=spider)
Copy the code

The crawl of the calling engine actually puts a request into a queue on the Scheduler. Here’s how the request is queued.

Request the team

Scheduler requests to join a queue:

def enqueue_request(self, request):
    Returns False if the filter validation request is repeated
    if not request.dont_filter and self.df.request_seen(request):
        self.df.log(request, self.spider)
        return False
    # Check whether the disk queue is queued successfully
    dqok = self._dqpush(request)
    if dqok:
        self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
    else:
        If no disk queue is defined, use memory queue
        self._mqpush(request)
        self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
    self.stats.inc_value('scheduler/enqueued', spider=self.spider)
    return True
    
def _dqpush(self, request):
    Whether to define a disk queue
    if self.dqs is None:
        return
    try:
        Request object to dict
        reqd = request_to_dict(request, self.spider)
        Put into disk queue
        self.dqs.push(reqd, -request.priority)
    except ValueError as e:  # non serializable request
        if self.logunser:
            msg = ("Unable to serialize request: %(request)s - reason:"
                   " %(reason)s - no more unserializable requests will be"
                   " logged (stats being collected)")
            logger.warning(msg, {'request': request, 'reason': e},
                           exc_info=True, extra={'spider'self.spider})
            self.logunser = False
        self.stats.inc_value('scheduler/unserializable',
                             spider=self.spider)
        return
    else:
        return True
    
def _mqpush(self, request):
    Enter the memory queue
    self.mqs.push(request, -request.priority)
Copy the code

As mentioned in the previous article, the scheduler defines two main types of queues: disk-based queues and memory-based queues.

If jobdir is passed in when Scheduler is instantiated, disk queues are used, otherwise memory queues are used, which is the default.

Fingerprint filtering

Before a request is queued, the request fingerprint filter is used to check whether the request is duplicated.

def request_seen(self, request):
    Generate the request fingerprint
    fp = self.request_fingerprint(request)
    The request fingerprint is considered duplicate if it is in the fingerprint collection
    if fp in self.fingerprints:
        return True
    Record this fingerprint if it is not repeated
    self.fingerprints.add(fp)
    Write the fingerprint to the file if there is a path
    if self.file:
        self.file.write(fp + os.linesep)

def request_fingerprint(self, request):
    Request_fingerprint is called on utils.request
    return request_fingerprint(request)
Copy the code

The request_fingerprint logic for utils.request is as follows:

def request_fingerprint(request, include_headers=None):
    """Generate request fingerprint"""
    Whether the fingerprint generation contains headers
    if include_headers:
        include_headers = tuple(to_bytes(h.lower())
                                 for h in sorted(include_headers))
    cache = _fingerprint_cache.setdefault(request, {})
    if include_headers not in cache:
        # Use SHA1 algorithm to generate fingerprints
        fp = hashlib.sha1()
        fp.update(to_bytes(request.method))
        fp.update(to_bytes(canonicalize_url(request.url)))
        fp.update(request.body or b'')
        if include_headers:
            for hdr in include_headers:
                if hdr in request.headers:
                    fp.update(hdr)
                    for v in request.headers.getlist(hdr):
                        fp.update(v)
        cache[include_headers] = fp.hexdigest()
    return cache[include_headers]
Copy the code

The filter first generates a Request fingerprint through the Request object, where SHA1 algorithm is used and the fingerprint set is recorded. The fingerprint set is verified here before each Request is queued. If the Request already exists, the Request is considered to be repeated and the Request will not be queued again.

But what if I want to repeat the crawl without checking the repeat? As you can see from the first line of enqueue_request, you only need to set the dont_filter of the Request instance to True to fetch the Request repeatedly.

Scrapy uses this logic to filter repeated requests. By default, repeated requests are not scraped.

Download request

The request must not be repeated the first time it comes in, so it will be queued normally by the scheduler. The next time the scheduler calls the _next_request_FROm_scheduler method, the next time the scheduler calls the next_request method, which removes a request from the scheduler queue, the next time the scheduler calls the _download method:

def _download(self, request, spider):
    # download request
    slot = self.slot
    slot.add_request(request)
    def _on_success(response):
        The successful callback result must be Request or Response
        assert isinstance(response, (Response, Request))
        if isinstance(response, Response):
            Return Response if the result is Response after downloading
            response.request = request
            logkws = self.logformatter.crawled(request, response, spider)
            logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
            self.signals.send_catch_log(signal=signals.response_received, \
                response=response, request=request, spider=spider)
        return response

    def _on_complete(_):
        Continue with the next schedule after this download
        slot.nextcall.schedule()
        return _

    Call Downloader for download
    dwld = self.downloader.fetch(request, spider)
    Register success callback
    dwld.addCallbacks(_on_success)
    # end callback
    dwld.addBoth(_on_complete)
    return dwld
Copy the code

Fetch of the Downloader was called while the network download was going on:

def fetch(self, request, spider):
    def _deactivate(response):
        Delete this record after downloading
        self.active.remove(request)
        return response
    Record requests being processed before downloading
    self.active.add(request)
    # self._enqueue_request = self._enqueue_request
    dfd = self.middleware.download(self._enqueue_request, request, spider)
    Register end callback
    return dfd.addBoth(_deactivate)
Copy the code

The download callback method is _enqueue_request.

def download(self, download_func, request, spider):
    @defer.inlineCallbacks
    def process_request(request):
        Process_request is executed in sequence if the downloader middleware defines it
        for method in self.methods['process_request']:
            response = yield method(request=request, spider=spider)
            assert response is None or isinstance(response, (Response, Request)), \
                    'Middleware %s.process_request must return None, Response or Request, got %s' % \
                    (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
            This result is returned if the downloader middleware has a return value
            if response:
                defer.returnValue(response)
        If the Downloader middleware does not return a value, the registered method, _enqueue_request of the Downloader, is executed
        defer.returnValue((yield download_func(request=request,spider=spider)))

    @defer.inlineCallbacks
    def process_response(response):
        assert response is not None, 'Received None in process_response'
        if isinstance(response, Request):
            defer.returnValue(response)

        Process_response is executed in sequence if the downloader middleware defines it
        for method in self.methods['process_response']:
            response = yield method(request=request, response=response,
                                    spider=spider)
            assert isinstance(response, (Response, Request)), \
                'Middleware %s.process_response must return Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if isinstance(response, Request):
                defer.returnValue(response)
        defer.returnValue(response)

    @defer.inlineCallbacks
    def process_exception(_failure):
        exception = _failure.value
        Process_exception is executed in sequence if the downloader middleware defines it
        for method in self.methods['process_exception']:
            response = yield method(request=request, exception=exception,
                                    spider=spider)
            assert response is None or isinstance(response, (Response, Request)), \
                'Middleware %s.process_exception must return None, Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if response:
                defer.returnValue(response)
        defer.returnValue(_failure)

    Register execution, error, callback methods
    deferred = mustbe_deferred(process_request, request)
    deferred.addErrback(process_exception)
    deferred.addCallback(process_response)
    return deferred
Copy the code

In the process of downloading, first find all defined download middleware, including built-in definition, can also extend the download middleware, before downloading the first to execute process_request, Request processing, processing, verification and other operations, and then launch the real network download. The first parameter download_func, in this case the _enqueue_request method of Downloader:

Call Downloader _enqueue_request after downloading successfully:

def _enqueue_request(self, request, spider):
    Join the download queue
    key, slot = self._get_slot(request, spider)
    request.meta['download_slot'] = key

    def _deactivate(response):
        slot.active.remove(request)
        return response

    slot.active.add(request)
    deferred = defer.Deferred().addBoth(_deactivate)
    # download queue
    slot.queue.append((request, deferred))
    # Process the download queue
    self._process_queue(spider, slot)
    return deferred
    
def _process_queue(self, spider, slot):
    if slot.latercall and slot.latercall.active():
        return

    Delay the queue if the delay download parameter is configured
    now = time()
    delay = slot.download_delay()
    if delay:
        penalty = delay - now + slot.lastseen
        if penalty > 0:
            slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
            return

    # Process the download queue
    while slot.queue and slot.free_transfer_slots() > 0:
        slot.lastseen = now
        Fetch the download request from the download queue
        request, deferred = slot.queue.popleft()
        # Start downloading
        dfd = self._download(slot, request, spider)
        dfd.chainDeferred(deferred)
        # delay
        if delay:
            self._process_queue(spider, slot)
            break
            
def _download(self, slot, request, spider):
    The handlers' download_request is registered
    dfd = mustbe_deferred(self.handlers.download_request, request, spider)

    Register to download the completion callback method
    def _downloaded(response):
        self.signals.send_catch_log(signal=signals.response_downloaded,
                                    response=response,
                                    request=request,
                                    spider=spider)
        return response
    dfd.addCallback(_downloaded)

    slot.transferring.add(request)

    def finish_transferring(_):
        slot.transferring.remove(request)
        Call _process_queue when the download is complete
        self._process_queue(spider, slot)
        return _

    return dfd.addBoth(finish_transferring)
Copy the code

A download queue is also maintained, which can be configured to accommodate delayed downloads. Handlers. Download_request:

def download_request(self, request, spider):
    Get the request scheme
    scheme = urlparse_cached(request).scheme
    Get the download handler from Scheeme
    handler = self._get_handler(scheme)
    if not handler:
        raise NotSupported("Unsupported URL scheme '%s': %s" %
                           (scheme, self._notconfigured[scheme]))
    Start the download and return the results
    return handler.download_request(request, spider)
    
def _get_handler(self, scheme):
    Get the corresponding download handler from Scheme
    The configuration file defines the download handler for HTTP, HTTPS, FTP and other resources
    if scheme in self._handlers:
        return self._handlers[scheme]
    if scheme in self._notconfigured:
        return None
    if scheme not in self._schemes:
        self._notconfigured[scheme] = 'no handler available for that scheme'
        return None

    path = self._schemes[scheme]
    try:
        Instantiate the download handler
        dhcls = load_object(path)
        dh = dhcls(self._crawler.settings)
    except NotConfigured as ex:
        self._notconfigured[scheme] = str(ex)
        return None
    except Exception as ex:
        logger.error('Loading "%(clspath)s" for scheme "%(scheme)s"',
                     {"clspath": path, "scheme": scheme},
                     exc_info=True,  extra={'crawler'self._crawler})
        self._notconfigured[scheme] = str(ex)
        return None
    else:
        self._handlers[scheme] = dh
    return self._handlers[scheme]
Copy the code

Before downloading the file, obtain the corresponding download processor by parsing the Request scheme. The default configuration file defines the download processor as follows:

DOWNLOAD_HANDLERS_BASE = {
    'file''scrapy.core.downloader.handlers.file.FileDownloadHandler'.'http''scrapy.core.downloader.handlers.http.HTTPDownloadHandler'.'https''scrapy.core.downloader.handlers.http.HTTPDownloadHandler'.'s3''scrapy.core.downloader.handlers.s3.S3DownloadHandler'.'ftp''scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',}Copy the code

You can then call the download_request method to complete the network download. Instead of going into the details of each processor implementation, you can simply think of it as a wrapped network download library. Type in the URL and it will output the download results for you to understand.

During the download process, if an exception occurs, the process_exception method of the downloader middleware is called in turn, and each middleware only needs to define its own exception-handling logic.

If the download is successful, the process_response method of the downloader middleware is executed in turn, and each middleware can further process the downloaded results and eventually return.

It is worth mentioning here that the process_request methods are executed sequentially by each middleware, while the process_response and process_exception methods are executed in reverse order by each middleware, Specific can see DownaloderMiddlewareManager _add_middleware method, you can understand how to register the method chain.

Once you have the final download result, go back to ExecuteEngine’s _next_REQUEST_FROm_scheduler and you will see that _HANDle_downloader_Output is called, which is the logic that processes the download result:

def _handle_downloader_output(self, response, request, spider):
    Download result must be one of Request, Response, or Failure
    assert isinstance(response, (Request, Response, Failure)), response
    Crawl is called again to join the Scheduler
    if isinstance(response, Request):
        self.crawl(response, spider)
        return
    Call the enqueue_scrape of the scraper for further processing if it is Response or Failure
    # Spiders and pipelines
    d = self.scraper.enqueue_scrape(response, request, spider)
    d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
                                        exc_info=failure_to_exc_info(f),
                                        extra={'spider': spider}))
    return d
Copy the code

After getting the download results, there are two main logics:

  • If I return aRequestInstance, then directly into againSchedulerThe request queue
  • If I return yesResponse 或 FailureInstance, then callsScraper 的 enqueue_scrapeMethod, do further processing

Handling download results

No need to say more about the queue request logic, which has already been covered. Now focus on the enqueue_scrape of the Scraper, and see how the Scraper component handles subsequent logic:

def enqueue_scrape(self, response, request, spider):
    # Join the Scrape queue
    slot = self.slot
    dfd = slot.add_response_request(response, request)
    def finish_scraping(_):
        slot.finish_response(response, request)
        self._check_if_closing(spider, slot)
        self._scrape_next(spider, slot)
        return _
    dfd.addBoth(finish_scraping)
    dfd.addErrback(
        lambda f: logger.error('Scraper bug processing %(request)s',
                               {'request': request},
                               exc_info=failure_to_exc_info(f),
                               extra={'spider': spider}))
    self._scrape_next(spider, slot)
    return dfd

def _scrape_next(self, spider, slot):
    while slot.queue:
        Retrieve a pending task from the Scraper queue
        response, request, deferred = slot.next_response_request_deferred()
        self._scrape(response, request, spider).chainDeferred(deferred)

def _scrape(self, response, request, spider):
    assert isinstance(response, (Response, Failure))
    # call _scrape2 to continue processing
    dfd = self._scrape2(response, request, spider)
    Register exception callback
    dfd.addErrback(self.handle_spider_error, request, response, spider)
    # exit callback
    dfd.addCallback(self.handle_spider_output, request, response, spider)
    return dfd

def _scrape2(self, request_result, request, spider):
    Invoke crawler middleware manager's scrape_Response if the result is not an instance of Failure
    if not isinstance(request_result, Failure):
        return self.spidermw.scrape_response(
            self.call_spider, request_result, request, spider)
    else:
        Call call_spider directly
        dfd = self.call_spider(request_result, request, spider)
        return dfd.addErrback(
            self._log_download_errors, request_result, request, spider)
Copy the code

The request and response are added to the Scraper processing queue, the task is retrieved from the queue, and if the result is not an exception, the crawler middleware manager’s scrape_Response method is invoked:

def scrape_response(self, scrape_func, response, request, spider):
    fname = lambda f:'%s.%s' % (
            six.get_method_self(f).__class__.__name__,
            six.get_method_function(f).__name__)

    def process_spider_input(response):
        Process_spider_input to execute a set of crawler middleware
        for method in self.methods['process_spider_input'] :try:
                result = method(response=response, spider=spider)
                assert result is None, \
                        'Middleware %s must returns None or ' \
                        'raise an exception, got %s ' \
                        % (fname(method), type(result))
            except:
                return scrape_func(Failure(), request, spider)
        Call_spider is executed after executing a series of process_SPIDer_INPUT methods of the middleware
        return scrape_func(response, request, spider)

    def process_spider_exception(_failure):
        Process_spider_exception executes a set of crawler middleware
        exception = _failure.value
        for method in self.methods['process_spider_exception']:
            result = method(response=response, exception=exception, spider=spider)
            assert result is None or _isiterable(result), \
                'Middleware %s must returns None, or an iterable object, got %s ' % \
                (fname(method), type(result))
            if result is not None:
                return result
        return _failure

    def process_spider_output(result):
        Process_spider_output to execute a set of crawler middleware
        for method in self.methods['process_spider_output']:
            result = method(response=response, result=result, spider=spider)
            assert _isiterable(result), \
                'Middleware %s must returns an iterable object, got %s ' % \
                (fname(method), type(result))
        return result

    # execution process_spider_input
    dfd = mustbe_deferred(process_spider_input, response)
    Register exception callback
    dfd.addErrback(process_spider_exception)
    Register exit callback
    dfd.addCallback(process_spider_output)
    return dfd
Copy the code

Does the routine feel familiar? Much like the way the downloader middleware calls it, it calls a series of pre-methods, then executes the actual processing logic, and finally executes a series of post-methods.

The callback crawler

Let’s take a look at how Scrapy implements our crawler logic, the call_spider method, which calls back our crawler:

def call_spider(self, result, request, spider):
    # callback crawler module
    result.request = request
    dfd = defer_result(result)
    Request. Callback is called to the crawler module's parse method if not defined
    dfd.addCallbacks(request.callback or spider.parse, request.errback)
    return dfd.addCallback(iterate_spider_output)
Copy the code

If you’re familiar with this, parse is the first callback method we write most often in crawler code. The crawler then gets the download result and defines the downloaded callback method, which is also where the callback is executed.

Process the output

After interacting with the crawler, the Scraper calls the handle_spider_Output method to process the crawler output:

def handle_spider_output(self, result, request, response, spider):
    # Process the crawler output
    if not result:
        return defer_succeed(None)
    it = iter_errback(result, self.handle_spider_error, request, response, spider)
    # registered _process_spidermw_output
    dfd = parallel(it, self.concurrent_items,
        self._process_spidermw_output, request, response, spider)
    return dfd

def _process_spidermw_output(self, output, request, response, spider):
    Handle every Request/Item returned by the Spider module
    if isinstance(output, Request):
        If the result is that the Request is returned to the Scheduler's Request queue
        self.crawler.engine.crawl(request=output, spider=spider)
    elif isinstance(output, (BaseItem, dict)):
        If the result is BaseItem/dict
        self.slot.itemproc_size += 1
        # call Pipeline process_item
        dfd = self.itemproc.process_item(output, spider)
        dfd.addBoth(self._itemproc_finished, output, response, spider)
        return dfd
    elif output is None:
        pass
    else:
        typename = type(output).__name__
        logger.error('Spider must return Request, BaseItem, dict or None, '
                     'got %(typename)r in %(request)s',
                     {'request': request, 'typename': typename},
                     extra={'spider': spider})
Copy the code

After executing our custom parsing logic, the parsing method can return a new Request or BaseItem instance.

If it is a new request, the request queue is again entered through the Scheduler, and if it is a BaseItem instance, the Pipeline manager is called, executing process_item in turn. When we want to output the result, we simply define the Pepeline class and override the method.

ItemPipeManager processing logic:

class ItemPipelineManager(MiddlewareManager) :component_name = 'item pipelineThe '@classmethod
    def _get_mwlist_from_settings(cls.settings) :return build_component_list(settings.getwithbase('ITEM_PIPELINES'))

    def _add_middleware(self.pipe) :super(ItemPipelineManager.self)._add_middleware(pipe)
        if hasattr(pipe, 'process_item') :self.methods['process_item'].append(pipe.process_item)

    def process_item(self.item.spider): # call in sequencePipelinetheprocess_item
        return self._process_chain('process_item', item.spider)
Copy the code

As you can see, ItemPipeManager is also a middleware, similar to the downloader middleware manager and crawler middleware manager before it, and executes process_item in turn if the subclass has defined it.

After executing, call _itemproc_finished:

def _itemproc_finished(self, output, item, response, spider):
    self.slot.itemproc_size -= 1
    if isinstance(output, Failure):
        ex = output.value
        # If a DropItem exception is thrown in Pipeline processing, ignore the result
        if isinstance(ex, DropItem):
            logkws = self.logformatter.dropped(item, ex, response, spider)
            logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
            return self.signals.send_catch_log_deferred(
                signal=signals.item_dropped, item=item, response=response,
                spider=spider, exception=output.value)
        else:
            logger.error('Error processing %(item)s', {'item': item},
                         exc_info=failure_to_exc_info(output),
                         extra={'spider': spider})
    else:
        logkws = self.logformatter.scraped(output, response, spider)
        logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
        return self.signals.send_catch_log_deferred(
            signal=signals.item_scraped, item=output, response=response,
            spider=spider)
Copy the code

If you want to discard a result from the Pipeline, throw a DropItem exception and Scrapy will do the same.

At this point, the results are fetched according to the custom output class and then output to the specified location, and the new Request is queued again, waiting for the next call to ExecutionEngine’s _next_REQUEST until there are no new tasks in the Request queue. The entire program exits.

CrawlerSpider

Above, basically the whole core capture process is finished.

The CrawlerSpider class, which we use a lot, inherits from the Spider class, overrides the parse method (which is why we don’t overwrite it), and combines it with the Rule class. To complete the automatic extraction logic of the Request. \

Scrapy provides this class to make crawler code faster, and we can repackage it to make it easier to write crawlers.

From this we can see that the realization of each module of Scrapy is very pure, each component by defining connect configuration file, if you want to extend or replace, only needs to define and implement their own processing logic can, other modules are not subject to any influence, so we can see, the industry has a lot of Scrapy plugin. It’s all done through this mechanism.

conclusion

This article has a lot of code and is the core of the scraping process of Scrapy. If you can understand this logic, it will be very easy to develop new plug-ins and build on it.

To summarize the whole crawling process, or to use these two diagrams to express it clearly:

Scrapy integral feeling is for me, although it is a stand-alone version of the crawler frame, but we can be very convenient to write plug-ins, or custom components to replace the default function, so as to customize our own crawler, eventually can realize a powerful crawler frame, such as distributed, scheduling, concurrency control, visualization, monitoring, and other functions, It’s very flexible.

Read more

Top 10 Best Popular Python Libraries of 2020 \

2020 Python Chinese Community Top 10 Articles \

5 minutes to quickly master the Python timed task framework \

Special recommendation \

\

Click below to read the article and join the community