Pay attention to the “water drop and silver bullet” public account, the first time to obtain high-quality technical dry goods. 7 years of senior back-end development, with a simple way to explain the technology clearly.
This article will take about 20 minutes to read.
What are the core components of Scrapy? We’ve examined the main responsibilities of Scrapy core components and what they do when they’re initialized.
In this article, we’ll take a look at the core of Scrapy’s scraping process, which schedules components to complete the scraping task.
Run the entrance
Back to the original entry, how does Scrapy source code work? Execute Scrapy. Execute Scrapy. Execute Scrapy.
- call
cmdline.py
的execute
methods - Find the corresponding
Command instances
Parsing command line - build
CrawlerProcess
Instance, callcrawl
和start
Method start grab
The crawl method, which eventually calls the Crawl of the Cralwer instance, gives control to Engine, while the start method registers the coroutine pool and starts scheduling execution asynchronously.
Let’s look at Cralwer’s crawl method:
@defer.inlineCallbacks
def crawl(self, *args, **kwargs) :
assert not self.crawling, "Crawling already taking place"
self.crawling = True
try:
Create a crawler instance
self.spider = self._create_spider(*args, **kwargs)
# create engine
self.engine = self._create_engine()
Call spider's start_requests for the seed URL
start_requests = iter(self.spider.start_requests())
Call engine's open_spider to the engine's schedule
yield self.engine.open_spider(self.spider, start_requests)
yield defer.maybeDeferred(self.engine.start)
except Exception:
if six.PY2:
exc_info = sys.exc_info()
self.crawling = False
if self.engine is not None:
yield self.engine.close()
if six.PY2:
six.reraise(*exc_info)
raise
Copy the code
Spiders /__init__. Py spiders/__init__. Py creates spider instances, engine requests, and spider start_requests.
def start_requests(self) :
Generate a seed URL object from the defined start_URLS property
for url in self.start_urls:
yield self.make_requests_from_url(url)
def make_requests_from_url(self, url) :
Construct the Request object
return Request(url, dont_filter=True)
Copy the code
Build request
As you can see from the above code, the start_urls property that we normally have to define is used to build a Request here.
class Request(object_ref) :
def __init__(self, url, callback=None, method='GET', headers=None, body=None,
cookies=None, meta=None, encoding='utf-8', priority=0,
dont_filter=False, errback=None) :
# code
self._encoding = encoding
# request method
self.method = str(method).upper()
# set the url
self._set_url(url)
# set the body
self._set_body(body)
assert isinstance(priority, int), "Request priority not an integer: %r" % priority
# priority
self.priority = priority
assert callback or not errback, "Cannot use errback without a callback"
# callback function
self.callback = callback
# exception callback function
self.errback = errback
# cookies
self.cookies = cookies or {}
# to build the Header
self.headers = Headers(headers or {}, encoding=encoding)
# Whether to filter
self.dont_filter = dont_filter
# Additional information
self._meta = dict(meta) if meta else None
Copy the code
The Request object is simple, encapsulating Request parameters, Request methods, callbacks, and attribute information that can be attached.
Of course, you can override start_requests and make_requests_from_URL in subclasses to build the seed request with custom logic.
Engine scheduling
Returning to the crawl method, engine’s open_spider is called after the seed request object is built:
@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True) :
assert self.has_capacity(), "No free spider slot when opening %r" % \
spider.name
logger.info("Spider opened", extra={'spider': spider})
# register _next_request scheduling method for loop scheduling
nextcall = CallLaterOnce(self._next_request, spider)
Initialize the scheduler
scheduler = self.scheduler_cls.from_crawler(self.crawler)
Call crawler middleware to handle seed request
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
# Encapsulate Slot objects
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.slot = slot
self.spider = spider
Call Scheduler's open
yield scheduler.open(spider)
# call scrapyer's open
yield self.scraper.open_spider(spider)
Call stats open
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
# start scheduling
slot.nextcall.schedule()
slot.heartbeat.start(5)
Copy the code
Here we first build a CallLaterOnce and then register the _next_request method.
class CallLaterOnce(object) :
Loop a method in twisted's Reactor
def __init__(self, func, *a, **kw) :
self._func = func
self._a = a
self._kw = kw
self._call = None
def schedule(self, delay=0) :
You can continue scheduling only when you initiated the last schedule
if self._call is None:
Register self into callLater
self._call = reactor.callLater(delay, self)
def cancel(self) :
if self._call:
self._call.cancel()
def __call__(self) :
Self is registered so __call__ will be executed
self._call = None
return self._func(*self._a, **self._kw)
Copy the code
This encapsulates the looped method class, and the registered method is executed asynchronously in the Twisted Reactor, which then registers self into the reactor’s callLater by calling Schedule, which then executes the __call__ method. The final implementation is our registered method.
The method registered here is the engine’s _next_request, that is, the method is scheduled in a loop until the program exits.
After calling the crawler middleware’s process_start_requests method, you can define multiple crawler middleware, override this method for each class, and the crawler will call your crawler middleware separately to handle initialization requests prior to scheduling, and you can filter, process, filter, and do whatever logic you want.
This has the advantage of splitting the desired logic into multiple middleware pieces, each of which is functionally independent and can be maintained more cleanly.
The scheduler
Scheduler’s open is called to start scheduling tasks:
def open(self, spider) :
self.spider = spider
# instantiate the priority queue
self.mqs = self.pqclass(self._newmq)
Instantiate disk-based queues if dQDIR is defined
self.dqs = self._dq() if self.dqdir else None
Call the open method that requests the fingerprint filter
return self.df.open(a)def _dq(self) :
Instantiate the disk queue
activef = join(self.dqdir, 'active.json')
if exists(activef):
with open(activef) as f:
prios = json.load(f)
else:
prios = ()
q = self.pqclass(self._newdq, startprios=prios)
if q:
logger.info("Resuming crawl (%(queuesize)d requests scheduled)",
{'queuesize': len(q)}, extra={'spider': self.spider})
return q
Copy the code
In the open method, the scheduler instantiates the priority queue and decides whether to use the disk queue based on whether dQDIR is configured. Finally, the open method that requests the fingerprint filter is defined in the BaseDupeFilter parent class:
class BaseDupeFilter(object) :
Filter base class, subclasses can override the following methods
@classmethod
def from_settings(cls, settings) :
return cls()
def request_seen(self, request) :
# request filter
return False
def open(self) :
Initialization of filters can be overridden
pass
def close(self, reason) :
# Rewritable to complete the job of turning off filters
pass
def log(self, request, spider) :
pas
Copy the code
Scrapy implements the RFPDupeFilter (RFPDupeFilter) and Scrapy implements the RFPDupeFilter to filter repeated requests.
Scraper
I then call the open_spider method of the Scraper. As mentioned in the previous article, the Scraper class is the bridge between the Engine, Spider, and Item Pipeline components:
@defer.inlineCallbacks
def open_spider(self, spider) :
self.slot = Slot()
# Call open_spider on all pipelines
yield self.itemproc.open_spider(spider)
Copy the code
The main logic here is the Scraper to invoke the open_spider methods on all pipelines. If we define multiple Pipeline output classes, we can override the Open_spider to initialize each Pipeline before output.
Cyclic scheduling
After calling a series of components’ open methods, nextCall.schedule () is finally called to start scheduling, which loops through the _next_request method registered above:
def _next_request(self, spider) :
# this method will schedule in a loop
slot = self.slot
if not slot:
return
# suspended
if self.paused:
return
# Wait or not
while not self._needs_backout(spider):
Fetch request from scheduler
# notice that the first fetch does not break
# to perform the following logic
if not self._next_request_from_scheduler(spider):
break
# if start_requests has data and no need to wait
if slot.start_requests and not self._needs_backout(spider):
try:
Get the next seed request
request = next(slot.start_requests)
except StopIteration:
slot.start_requests = None
except Exception:
slot.start_requests = None
logger.error('Error while obtaining start requests',
exc_info=True, extra={'spider': spider})
else:
Call the crawl, which actually places the request on the scheduler queue
self.crawl(request, spider)
Disable spider when idle
if self.spider_is_idle(spider) and slot.close_if_idle:
self._spider_idle(spider)
def _needs_backout(self, spider) :
There are four criteria for waiting
# 1. Whether to stop the Engine
# 2. Whether slot is close
# 3. Downloader downloads more than preset
Response exceeds the default
slot = self.slot
return not self.running \
or slot.closing \
or self.downloader.needs_backout() \
or self.scraper.slot.needs_backout()
def _next_request_from_scheduler(self, spider) :
slot = self.slot
Scheduler removes the next request from the scheduler
request = slot.scheduler.next_request()
if not request:
return
# download
d = self._download(request, spider)
Register success, fail, exit callback method
d.addBoth(self._handle_downloader_output, request, spider)
d.addErrback(lambda f: logger.info('Error while handling downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.remove_request(request))
d.addErrback(lambda f: logger.info('Error while removing request from slot',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.nextcall.schedule())
d.addErrback(lambda f: logger.info('Error while scheduling new request',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
def crawl(self, request, spider) :
assert spider in self.open_spiders, \
"Spider %r not opened when crawling: %s" % (spider.name, request)
The request is placed in the scheduler queue and the schedule of nextCall is called
self.schedule(request, spider)
self.slot.nextcall.schedule()
def schedule(self, request, spider) :
self.signals.send_catch_log(signal=signals.request_scheduled,
request=request, spider=spider)
Call scheduler's enqueue_request and place the request on the scheduler queue
if not self.slot.scheduler.enqueue_request(request):
self.signals.send_catch_log(signal=signals.request_dropped,
request=request, spider=spider)
Copy the code
The _next_REQUEST method first calls _needs_backout to check whether it needs to wait, if any of the following conditions exist:
- Whether the engine is actively shut down
- Slot closed or not
- Whether the downloader exceeds the preset parameters when downloading from the network
- Scraper processes whether the output exceeds the preset parameter
If there is no waiting, the _next_request_FROm_scheduler method is called, which, as its name suggests, mainly fetches the Request from Schduler.
Note that the first time this method is called, no Request is placed in the Scheduler. Instead, it breaks off to perform the following logic, and the crawl method is called to place the Request on the Scheduler’s queue. The queuing process is checked by the request filter for duplication.
The next time the _next_request_FROm_scheduler is called, the download request is fetched from the Scheduler and the download action is executed.
To start with the first dispatch, perform the crawl:
def crawl(self, request, spider) :
assert spider in self.open_spiders, \
"Spider %r not opened when crawling: %s" % (spider.name, request)
Place the Scheduler queue
self.schedule(request, spider)
# Perform the next dispatch
self.slot.nextcall.schedule()
def schedule(self, request, spider) :
self.signals.send_catch_log(signal=signals.request_scheduled,
request=request, spider=spider)
Place the Scheduler queue
if not self.slot.scheduler.enqueue_request(request):
self.signals.send_catch_log(signal=signals.request_dropped,
request=request, spider=spider)
Copy the code
The crawl of the calling engine actually puts a request into a queue on the Scheduler. Here’s how the request is queued.
Request the team
Scheduler requests to join a queue:
def enqueue_request(self, request) :
Returns False if the filter validation request is repeated
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
# Check whether the disk queue is queued successfully
dqok = self._dqpush(request)
if dqok:
self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
else:
If no disk queue is defined, use memory queue
self._mqpush(request)
self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
self.stats.inc_value('scheduler/enqueued', spider=self.spider)
return True
def _dqpush(self, request) :
Whether to define a disk queue
if self.dqs is None:
return
try:
Request object to dict
reqd = request_to_dict(request, self.spider)
Put into disk queue
self.dqs.push(reqd, -request.priority)
except ValueError as e: # non serializable request
if self.logunser:
msg = ("Unable to serialize request: %(request)s - reason:"
" %(reason)s - no more unserializable requests will be"
" logged (stats being collected)")
logger.warning(msg, {'request': request, 'reason': e},
exc_info=True, extra={'spider': self.spider})
self.logunser = False
self.stats.inc_value('scheduler/unserializable',
spider=self.spider)
return
else:
return True
def _mqpush(self, request) :
Enter the memory queue
self.mqs.push(request, -request.priority)
Copy the code
As mentioned in the previous article, the scheduler defines two main types of queues: disk-based queues and memory-based queues.
If jobdir is passed in when Scheduler is instantiated, disk queues are used, otherwise memory queues are used, which is the default.
Fingerprint filtering
Before a request is queued, the request fingerprint filter is used to check whether the request is duplicated.
def request_seen(self, request) :
Generate the request fingerprint
fp = self.request_fingerprint(request)
The request fingerprint is considered duplicate if it is in the fingerprint collection
if fp in self.fingerprints:
return True
Record this fingerprint if it is not repeated
self.fingerprints.add(fp)
Write the fingerprint to the file if there is a path
if self.file:
self.file.write(fp + os.linesep)
def request_fingerprint(self, request) :
Request_fingerprint is called on utils.request
return request_fingerprint(request)
Copy the code
The request_fingerprint logic for utils.request is as follows:
def request_fingerprint(request, include_headers=None) :
""" Generate request fingerprint """
Whether the fingerprint generation contains headers
if include_headers:
include_headers = tuple(to_bytes(h.lower())
for h in sorted(include_headers))
cache = _fingerprint_cache.setdefault(request, {})
if include_headers not in cache:
# Use SHA1 algorithm to generate fingerprints
fp = hashlib.sha1()
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url)))
fp.update(request.body or b'')
if include_headers:
for hdr in include_headers:
if hdr in request.headers:
fp.update(hdr)
for v in request.headers.getlist(hdr):
fp.update(v)
cache[include_headers] = fp.hexdigest()
return cache[include_headers]
Copy the code
The filter first generates a Request fingerprint through the Request object, where SHA1 algorithm is used and the fingerprint set is recorded. The fingerprint set is verified here before each Request is queued. If the Request already exists, the Request is considered to be repeated and the Request will not be queued again.
But what if I want to repeat the crawl without checking the repeat? As you can see from the first line of enqueue_request, you only need to set the dont_filter of the Request instance to True to fetch the Request repeatedly.
Scrapy uses this logic to filter repeated requests. By default, repeated requests are not scraped.
Download request
The request must not be repeated the first time it comes in, so it will be queued normally by the scheduler. The next time the scheduler calls the _next_request_FROm_scheduler method, the next time the scheduler calls the next_request method, which removes a request from the scheduler queue, the next time the scheduler calls the _download method:
def _download(self, request, spider) :
# download request
slot = self.slot
slot.add_request(request)
def _on_success(response) :
The successful callback result must be Request or Response
assert isinstance(response, (Response, Request))
if isinstance(response, Response):
Return Response if the result is Response after downloading
response.request = request
logkws = self.logformatter.crawled(request, response, spider)
logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
self.signals.send_catch_log(signal=signals.response_received, \
response=response, request=request, spider=spider)
return response
def _on_complete(_) :
Continue with the next schedule after this download
slot.nextcall.schedule()
return _
Call Downloader for download
dwld = self.downloader.fetch(request, spider)
Register success callback
dwld.addCallbacks(_on_success)
# end callback
dwld.addBoth(_on_complete)
return dwld
Copy the code
Fetch of the Downloader was called while the network download was going on:
def fetch(self, request, spider) :
def _deactivate(response) :
Delete this record after downloading
self.active.remove(request)
return response
Record requests being processed before downloading
self.active.add(request)
# self._enqueue_request = self._enqueue_request
dfd = self.middleware.download(self._enqueue_request, request, spider)
Register end callback
return dfd.addBoth(_deactivate)
Copy the code
The download callback method is _enqueue_request.
def download(self, download_func, request, spider) :
@defer.inlineCallbacks
def process_request(request) :
Process_request is executed in sequence if the downloader middleware defines it
for method in self.methods['process_request']:
response = yield method(request=request, spider=spider)
assert response is None or isinstance(response, (Response, Request)), \
'Middleware %s.process_request must return None, Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, response.__class__.__name__)
This result is returned if the downloader middleware has a return value
if response:
defer.returnValue(response)
If the Downloader middleware does not return a value, the registered method, _enqueue_request of the Downloader, is executed
defer.returnValue((yield download_func(request=request,spider=spider)))
@defer.inlineCallbacks
def process_response(response) :
assert response is not None.'Received None in process_response'
if isinstance(response, Request):
defer.returnValue(response)
Process_response is executed in sequence if the downloader middleware defines it
for method in self.methods['process_response']:
response = yield method(request=request, response=response,
spider=spider)
assert isinstance(response, (Response, Request)), \
'Middleware %s.process_response must return Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, type(response))
if isinstance(response, Request):
defer.returnValue(response)
defer.returnValue(response)
@defer.inlineCallbacks
def process_exception(_failure) :
exception = _failure.value
Process_exception is executed in sequence if the downloader middleware defines it
for method in self.methods['process_exception']:
response = yield method(request=request, exception=exception,
spider=spider)
assert response is None or isinstance(response, (Response, Request)), \
'Middleware %s.process_exception must return None, Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, type(response))
if response:
defer.returnValue(response)
defer.returnValue(_failure)
Register execution, error, callback methods
deferred = mustbe_deferred(process_request, request)
deferred.addErrback(process_exception)
deferred.addCallback(process_response)
return deferred
Copy the code
In the process of downloading, first find all defined download middleware, including built-in definition, can also extend the download middleware, before downloading the first to execute process_request, Request processing, processing, verification and other operations, and then launch the real network download. The first parameter download_func, in this case the _enqueue_request method of Downloader:
Call Downloader _enqueue_request after downloading successfully:
def _enqueue_request(self, request, spider) :
Join the download queue
key, slot = self._get_slot(request, spider)
request.meta['download_slot'] = key
def _deactivate(response) :
slot.active.remove(request)
return response
slot.active.add(request)
deferred = defer.Deferred().addBoth(_deactivate)
# download queue
slot.queue.append((request, deferred))
# Process the download queue
self._process_queue(spider, slot)
return deferred
def _process_queue(self, spider, slot) :
if slot.latercall and slot.latercall.active():
return
Delay the queue if the delay download parameter is configured
now = time()
delay = slot.download_delay()
if delay:
penalty = delay - now + slot.lastseen
if penalty > 0:
slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
return
# Process the download queue
while slot.queue and slot.free_transfer_slots() > 0:
slot.lastseen = now
Fetch the download request from the download queue
request, deferred = slot.queue.popleft()
# Start downloading
dfd = self._download(slot, request, spider)
dfd.chainDeferred(deferred)
# delay
if delay:
self._process_queue(spider, slot)
break
def _download(self, slot, request, spider) :
The handlers' download_request is registered
dfd = mustbe_deferred(self.handlers.download_request, request, spider)
Register to download the completion callback method
def _downloaded(response) :
self.signals.send_catch_log(signal=signals.response_downloaded,
response=response,
request=request,
spider=spider)
return response
dfd.addCallback(_downloaded)
slot.transferring.add(request)
def finish_transferring(_) :
slot.transferring.remove(request)
Call _process_queue when the download is complete
self._process_queue(spider, slot)
return _
return dfd.addBoth(finish_transferring)
Copy the code
A download queue is also maintained, which can be configured to accommodate delayed downloads. Handlers. Download_request:
def download_request(self, request, spider) :
Get the request scheme
scheme = urlparse_cached(request).scheme
Get the download handler from Scheeme
handler = self._get_handler(scheme)
if not handler:
raise NotSupported("Unsupported URL scheme '%s': %s" %
(scheme, self._notconfigured[scheme]))
Start the download and return the results
return handler.download_request(request, spider)
def _get_handler(self, scheme) :
Get the corresponding download handler from Scheme
The configuration file defines the download handler for HTTP, HTTPS, FTP and other resources
if scheme in self._handlers:
return self._handlers[scheme]
if scheme in self._notconfigured:
return None
if scheme not in self._schemes:
self._notconfigured[scheme] = 'no handler available for that scheme'
return None
path = self._schemes[scheme]
try:
Instantiate the download handler
dhcls = load_object(path)
dh = dhcls(self._crawler.settings)
except NotConfigured as ex:
self._notconfigured[scheme] = str(ex)
return None
except Exception as ex:
logger.error('Loading "%(clspath)s" for scheme "%(scheme)s"',
{"clspath": path, "scheme": scheme},
exc_info=True, extra={'crawler': self._crawler})
self._notconfigured[scheme] = str(ex)
return None
else:
self._handlers[scheme] = dh
return self._handlers[scheme]
Copy the code
Before downloading the file, obtain the corresponding download processor by parsing the Request scheme. The default configuration file defines the download processor as follows:
DOWNLOAD_HANDLERS_BASE = {
'file': 'scrapy.core.downloader.handlers.file.FileDownloadHandler'.'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler'.'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler'.'s3': 'scrapy.core.downloader.handlers.s3.S3DownloadHandler'.'ftp': 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',}Copy the code
You can then call the download_request method to complete the network download. Instead of going into the details of each processor implementation, you can simply think of it as a wrapped network download library. Type in the URL and it will output the download results for you to understand.
During the download process, if an exception occurs, the process_exception method of the downloader middleware is called in turn, and each middleware only needs to define its own exception-handling logic.
If the download is successful, the process_response method of the downloader middleware is executed in turn, and each middleware can further process the downloaded results and eventually return.
It is worth mentioning here that the process_request methods are executed sequentially by each middleware, while the process_response and process_exception methods are executed in reverse order by each middleware, Specific can see DownaloderMiddlewareManager _add_middleware method, you can understand how to register the method chain.
Once you have the final download result, go back to ExecuteEngine’s _next_REQUEST_FROm_scheduler and you will see that _HANDle_downloader_Output is called, which is the logic that processes the download result:
def _handle_downloader_output(self, response, request, spider) :
Download result must be one of Request, Response, or Failure
assert isinstance(response, (Request, Response, Failure)), response
Crawl is called again to join the Scheduler
if isinstance(response, Request):
self.crawl(response, spider)
return
Call the enqueue_scrape of the scraper for further processing if it is Response or Failure
# Spiders and pipelines
d = self.scraper.enqueue_scrape(response, request, spider)
d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
Copy the code
After getting the download results, there are two main logics:
- If I return a
Request
Instance, then directly into againScheduler
The request queue - If I return yes
Response
或Failure
Instance, then callsScraper
的enqueue_scrape
Method, do further processing
Handling download results
No need to say more about the queue request logic, which has already been covered. Now focus on the enqueue_scrape of the Scraper, and see how the Scraper component handles subsequent logic:
def enqueue_scrape(self, response, request, spider) :
# Join the Scrape queue
slot = self.slot
dfd = slot.add_response_request(response, request)
def finish_scraping(_) :
slot.finish_response(response, request)
self._check_if_closing(spider, slot)
self._scrape_next(spider, slot)
return _
dfd.addBoth(finish_scraping)
dfd.addErrback(
lambda f: logger.error('Scraper bug processing %(request)s',
{'request': request},
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
self._scrape_next(spider, slot)
return dfd
def _scrape_next(self, spider, slot) :
while slot.queue:
Retrieve a pending task from the Scraper queue
response, request, deferred = slot.next_response_request_deferred()
self._scrape(response, request, spider).chainDeferred(deferred)
def _scrape(self, response, request, spider) :
assert isinstance(response, (Response, Failure))
# call _scrape2 to continue processing
dfd = self._scrape2(response, request, spider)
Register exception callback
dfd.addErrback(self.handle_spider_error, request, response, spider)
# exit callback
dfd.addCallback(self.handle_spider_output, request, response, spider)
return dfd
def _scrape2(self, request_result, request, spider) :
Invoke crawler middleware manager's scrape_Response if the result is not an instance of Failure
if not isinstance(request_result, Failure):
return self.spidermw.scrape_response(
self.call_spider, request_result, request, spider)
else:
Call call_spider directly
dfd = self.call_spider(request_result, request, spider)
return dfd.addErrback(
self._log_download_errors, request_result, request, spider)
Copy the code
The request and response are added to the Scraper processing queue, the task is retrieved from the queue, and if the result is not an exception, the crawler middleware manager’s scrape_Response method is invoked:
def scrape_response(self, scrape_func, response, request, spider) :
fname = lambda f:'%s.%s' % (
six.get_method_self(f).__class__.__name__,
six.get_method_function(f).__name__)
def process_spider_input(response) :
Process_spider_input to execute a set of crawler middleware
for method in self.methods['process_spider_input'] :try:
result = method(response=response, spider=spider)
assert result is None, \
'Middleware %s must returns None or ' \
'raise an exception, got %s ' \
% (fname(method), type(result))
except:
return scrape_func(Failure(), request, spider)
Call_spider is executed after executing a series of process_SPIDer_INPUT methods of the middleware
return scrape_func(response, request, spider)
def process_spider_exception(_failure) :
Process_spider_exception executes a set of crawler middleware
exception = _failure.value
for method in self.methods['process_spider_exception']:
result = method(response=response, exception=exception, spider=spider)
assert result is None or _isiterable(result), \
'Middleware %s must returns None, or an iterable object, got %s ' % \
(fname(method), type(result))
if result is not None:
return result
return _failure
def process_spider_output(result) :
Process_spider_output to execute a set of crawler middleware
for method in self.methods['process_spider_output']:
result = method(response=response, result=result, spider=spider)
assert _isiterable(result), \
'Middleware %s must returns an iterable object, got %s ' % \
(fname(method), type(result))
return result
# execution process_spider_input
dfd = mustbe_deferred(process_spider_input, response)
Register exception callback
dfd.addErrback(process_spider_exception)
Register exit callback
dfd.addCallback(process_spider_output)
return dfd
Copy the code
Does the routine feel familiar? Much like the way the downloader middleware calls it, it calls a series of pre-methods, then executes the actual processing logic, and finally executes a series of post-methods.
The callback crawler
Let’s take a look at how Scrapy implements our crawler logic, the call_spider method, which calls back our crawler:
def call_spider(self, result, request, spider) :
# callback crawler module
result.request = request
dfd = defer_result(result)
Request. Callback is called to the crawler module's parse method if not defined
dfd.addCallbacks(request.callback or spider.parse, request.errback)
return dfd.addCallback(iterate_spider_output)
Copy the code
If you’re familiar with this, parse is the first callback method we write most often in crawler code. The crawler then gets the download result and defines the downloaded callback method, which is also where the callback is executed.
Process the output
After interacting with the crawler, the Scraper calls the handle_spider_Output method to process the crawler output:
def handle_spider_output(self, result, request, response, spider) :
# Process the crawler output
if not result:
return defer_succeed(None)
it = iter_errback(result, self.handle_spider_error, request, response, spider)
# registered _process_spidermw_output
dfd = parallel(it, self.concurrent_items,
self._process_spidermw_output, request, response, spider)
return dfd
def _process_spidermw_output(self, output, request, response, spider) :
Handle every Request/Item returned by the Spider module
if isinstance(output, Request):
If the result is that the Request is returned to the Scheduler's Request queue
self.crawler.engine.crawl(request=output, spider=spider)
elif isinstance(output, (BaseItem, dict)) :If the result is BaseItem/dict
self.slot.itemproc_size += 1
# call Pipeline process_item
dfd = self.itemproc.process_item(output, spider)
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None:
pass
else:
typename = type(output).__name__
logger.error('Spider must return Request, BaseItem, dict or None, '
'got %(typename)r in %(request)s',
{'request': request, 'typename': typename},
extra={'spider': spider})
Copy the code
After executing our custom parsing logic, the parsing method can return a new Request or BaseItem instance.
If it is a new request, the request queue is again entered through the Scheduler, and if it is a BaseItem instance, the Pipeline manager is called, executing process_item in turn. When we want to output the result, we simply define the Pepeline class and override the method.
ItemPipeManager processing logic:
class ItemPipelineManager(MiddlewareManager) :
component_name = 'item pipeline'
@classmethod
def _get_mwlist_from_settings(cls, settings) :
return build_component_list(settings.getwithbase('ITEM_PIPELINES'))
def _add_middleware(self, pipe) :
super(ItemPipelineManager, self)._add_middleware(pipe)
if hasattr(pipe, 'process_item'):
self.methods['process_item'].append(pipe.process_item)
def process_item(self, item, spider) :
# Call Pipeline process_item in turn
return self._process_chain('process_item', item, spider)
Copy the code
As you can see, ItemPipeManager is also a middleware, similar to the downloader middleware manager and crawler middleware manager before it, and executes process_item in turn if the subclass has defined it.
After executing, call _itemproc_finished:
def _itemproc_finished(self, output, item, response, spider) :
self.slot.itemproc_size -= 1
if isinstance(output, Failure):
ex = output.value
# If a DropItem exception is thrown in Pipeline processing, ignore the result
if isinstance(ex, DropItem):
logkws = self.logformatter.dropped(item, ex, response, spider)
logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
return self.signals.send_catch_log_deferred(
signal=signals.item_dropped, item=item, response=response,
spider=spider, exception=output.value)
else:
logger.error('Error processing %(item)s', {'item': item},
exc_info=failure_to_exc_info(output),
extra={'spider': spider})
else:
logkws = self.logformatter.scraped(output, response, spider)
logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
return self.signals.send_catch_log_deferred(
signal=signals.item_scraped, item=output, response=response,
spider=spider)
Copy the code
If you want to discard a result from the Pipeline, throw a DropItem exception and Scrapy will do the same.
At this point, the results are fetched according to the custom output class and then output to the specified location, and the new Request is queued again, waiting for the next call to ExecutionEngine’s _next_REQUEST until there are no new tasks in the Request queue. The entire program exits.
CrawlerSpider
Above, basically the whole core capture process is finished.
The CrawlerSpider class, which we use a lot, inherits from the Spider class, overrides the parse method (which is why we don’t overwrite it), and combines it with the Rule class. To complete the automatic extraction logic of the Request.
Scrapy provides this class to make crawler code faster, and we can repackage it to make it easier to write crawlers.
From this we can see that the realization of each module of Scrapy is very pure, each component by defining connect configuration file, if you want to extend or replace, only needs to define and implement their own processing logic can, other modules are not subject to any influence, so we can see, the industry has a lot of Scrapy plugin. It’s all done through this mechanism.
conclusion
This article has a lot of code and is the core of the scraping process of Scrapy. If you can understand this logic, it will be very easy to develop new plug-ins and build on it.
To summarize the whole crawling process, or to use these two diagrams to express it clearly:
Scrapy integral feeling is for me, although it is a stand-alone version of the crawler frame, but we can be very convenient to write plug-ins, or custom components to replace the default function, so as to customize our own crawler, eventually can realize a powerful crawler frame, such as distributed, scheduling, concurrency control, visualization, monitoring, and other functions, It’s very flexible.
Crawler series:
- Scrapy source code analysis (a) architecture overview
- Scrapy source code analysis (two) how to run Scrapy?
- Scrapy source code analysis (three) what are the core components of Scrapy?
- Scrapy source code analysis (four) how to complete the scraping task?
- How to build a crawler proxy service?
- How to build a universal vertical crawler platform?
My advanced Python series:
- Python Advanced – How to implement a decorator?
- Python Advanced – How to use magic methods correctly? (on)
- Python Advanced – How to use magic methods correctly? (below)
- Python Advanced — What is a metaclass?
- Python Advanced – What is a Context manager?
- Python Advancements — What is an iterator?
- Python Advancements — How to use yield correctly?
- Python Advanced – What is a descriptor?
- Python Advancements – Why does GIL make multithreading so useless?
Want to read more hardcore technology articles? Focus on”Water drops and silver bullets”Public number, the first time to obtain high-quality technical dry goods. 7 years of senior back-end development, with a simple way to explain the technology clearly.