import json
import logging
import warnings
from collections import defaultdict
from datetime import datetime
from typing import (
Any,
AsyncIterable,
Dict,
Generator,
Iterable,
List,
Optional,
Set,
Union,
)
from warnings import warn
from scrapy import Request, Spider
from scrapy.crawler import Crawler
from scrapy.dupefilters import RFPDupeFilter
from scrapy.exceptions import IgnoreRequest, NotConfigured
from scrapy.http import Response
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.url import url_is_from_any_domain
from scrapy_poet import DynamicDeps
from zyte_common_items import Article, ArticleNavigation, Item
try:
from scrapy.downloadermiddlewares.offsite import OffsiteMiddleware
except ImportError:
from scrapy.spidermiddlewares.offsite import OffsiteMiddleware # type: ignore[assignment]
from zyte_spider_templates.utils import get_domain
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", message="model will result in indexing errors*")
[docs]
class CrawlingLogsMiddleware:
"""For each page visited, this logs what the spider has extracted and planning
to crawl next.
The motivation for such logs is to easily debug the crawling behavior and see
what went wrong. Apart from high-level summarized information, this also includes
JSON-formatted data so that it can easily be parsed later on.
Some notes:
- ``scrapy.utils.request.request_fingerprint`` is used to match what
https://github.com/scrapinghub/scrapinghub-entrypoint-scrapy uses.
This makes it easier to work with since we can easily match it with
the fingerprints logged in Scrapy Cloud's request data.
"""
unknown_page_type = "unknown"
@classmethod
def from_crawler(cls, crawler):
try:
result = cls(crawler)
except TypeError:
warn(
(
"Subclasses of CrawlingLogsMiddleware must now accept a "
"crawler parameter in their __init__ method. This will "
"become an error in the future."
),
DeprecationWarning,
)
result = cls()
result._crawler = crawler
return result
def __init__(self, crawler=None):
self._crawler = crawler
def _fingerprint(self, request):
return self._crawler.request_fingerprinter.fingerprint(request).hex()
def process_spider_output(self, response, result, spider):
result = list(result)
crawl_logs = self.crawl_logs(response, result)
logger.info(crawl_logs)
return result
def crawl_logs(self, response, result):
current_page_type = response.meta.get("crawling_logs", {}).get("page_type")
fingerprint = self._fingerprint(response.request)
data: Dict[str, Any] = {
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"current": {
"url": response.url,
"request_url": response.request.url,
# TODO: update this when the following is updated to use the same fingerprinter
# with Scrapy: https://github.com/scrapinghub/scrapinghub-entrypoint-scrapy/
"request_fingerprint": fingerprint,
"page_type": current_page_type,
"probability": response.meta.get("crawling_logs", {}).get(
"probability"
),
},
"to_crawl": defaultdict(list),
}
if result:
for entry in result:
if not isinstance(entry, Request):
continue
crawling_logs = entry.meta.get("crawling_logs", {})
entry_fingerprint = self._fingerprint(entry)
crawling_logs.update(
{
"request_url": entry.url,
"request_priority": entry.priority,
"request_fingerprint": entry_fingerprint,
}
)
page_type = crawling_logs.get("page_type")
if not page_type:
page_type = self.unknown_page_type
data["to_crawl"][page_type].append(crawling_logs)
if data["to_crawl"]:
summary = ["Number of Requests per page type:"]
for page_type, requests in data["to_crawl"].items():
summary.append(f"- {page_type}: {len(requests)}")
else:
summary = ["Nothing to crawl."]
report = [
f"Crawling Logs for {response.url} (parsed as: {current_page_type}):",
"\n".join(summary),
"Structured Logs:",
json.dumps(data, indent=2),
]
return "\n".join(report)
class AllowOffsiteMiddleware(OffsiteMiddleware):
def should_follow(self, request: Request, spider: Spider) -> bool:
if "zyte_api" in request.meta:
# The request looks like a dependency injection request, and any
# domain-based filtering should have been handled in the original
# request handling, before dependency injection.
return True
if request.meta.get("allow_offsite") is True:
return True
return super().should_follow(request, spider)
[docs]
class MaxRequestsPerSeedDownloaderMiddleware:
"""This middleware limits the number of requests that each seed request can subsequently
have.
To enable this middleware, set the ``MAX_REQUESTS_PER_SEED`` setting to
the desired positive value. Non-positive integers (i.e. 0 and below)
imposes no limit and disables this middleware.
By default, all start requests are considered seed requests, and all other
requests are not.
Please note that you also need to enable TrackSeedsSpiderMiddleware to make this work.
"""
def __init__(self, crawler: Crawler):
assert crawler.spider
max_requests_per_seed = max(
0, crawler.spider.settings.getint("MAX_REQUESTS_PER_SEED", 0)
)
if not max_requests_per_seed:
raise NotConfigured
self.crawler = crawler
self.requests_per_seed: defaultdict = defaultdict(int)
self.seeds_reached_limit: Set[str] = set()
self.max_requests_per_seed = max_requests_per_seed
@classmethod
def from_crawler(cls, crawler: Crawler):
return cls(crawler)
def process_request(self, request, spider):
seed = request.meta.get("seed")
if seed is None:
return
if self.max_requests_per_seed_reached(seed):
self.seeds_reached_limit.add(seed)
logging.debug(
f"The request {request} is skipped as {self.max_requests_per_seed} "
f"max requests per seed have been reached for seed {seed}."
)
assert self.crawler.stats
self.crawler.stats.set_value(
"seeds/max_requests_reached", len(self.seeds_reached_limit)
)
raise IgnoreRequest("max_requests_per_seed_reached")
self.requests_per_seed[seed] += 1
return
def max_requests_per_seed_reached(self, seed: str) -> bool:
return self.requests_per_seed.get(seed, 0) >= self.max_requests_per_seed
[docs]
class TrackSeedsSpiderMiddleware:
def __init__(self, crawler: Crawler):
self.crawler = crawler
@classmethod
def from_crawler(cls, crawler: Crawler):
return cls(crawler)
def process_start_requests(
self, start_requests: Iterable[Request], spider: Spider
) -> Iterable[Request]:
for request in start_requests:
request.meta.setdefault("seed", request.url)
request.meta.setdefault("is_seed_request", True)
yield request
def process_spider_output(
self,
response: Response,
result: Iterable[Union[Request, Item]],
spider: Spider,
) -> Iterable[Union[Request, Item]]:
for item_or_request in result:
if not isinstance(item_or_request, Request):
yield item_or_request
continue
yield from self._process_request(item_or_request, response)
async def process_spider_output_async(
self,
response: Response,
result: AsyncIterable[Union[Request, Item]],
spider: Spider,
) -> AsyncIterable[Union[Request, Item]]:
async for item_or_request in result:
if not isinstance(item_or_request, Request):
yield item_or_request
continue
for processed_request in self._process_request(item_or_request, response):
yield processed_request
def _process_request(
self, request: Request, response: Response
) -> Iterable[Request]:
seed = request.meta.get("seed", response.meta.get("seed"))
if seed is None:
# we don't want to add a seed meta key with None if it is not in meta
yield request
return
request.meta["seed"] = seed
yield request
class PageParamsMiddlewareBase:
def __init__(self, crawler):
self.crawler = crawler
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_start_requests(
self, start_requests: List[Request], spider: Spider
) -> Iterable[Request]:
for request in start_requests:
self._update_page_params(request)
yield request
def process_spider_output(
self, response, result, spider
) -> Iterable[Union[Request, Item]]:
for item_or_request in result:
if isinstance(item_or_request, Request):
self._update_page_params(item_or_request)
yield item_or_request
async def process_spider_output_async(
self, response, result, spider
) -> AsyncIterable[Union[Request, Item]]:
async for item_or_request in result:
if isinstance(item_or_request, Request):
self._update_page_params(item_or_request)
yield item_or_request
def _update_page_params(self, request) -> None:
page_params = request.meta.setdefault("page_params", {})
self.update_page_params(request, page_params)
def update_page_params(self, request, page_params) -> None:
pass
[docs]
class TrackNavigationDepthSpiderMiddleware(PageParamsMiddlewareBase):
"""
This middleware helps manage navigation depth by setting a `final_navigation_page` meta key
when the predefined depth limit (`NAVIGATION_DEPTH_LIMIT`) is reached.
.. note::
Navigation depth is typically increased for requests that navigate to a subcategory
originating from its parent category, such as a request targeting a category starting
from the website home page. However, it may not be necessary to increase navigation
depth, for example, for the next pagination requests.
Spiders can customize this behavior as needed by controlling when navigation depth is incremented.
"""
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def __init__(self, crawler):
if max_navigation_depth := max(
crawler.spider.settings.getint("NAVIGATION_DEPTH_LIMIT", 0), 0
):
self.max_navigation_depth = max_navigation_depth
self.stats = crawler.stats
else:
raise NotConfigured
def update_page_params(self, request, page_params) -> None:
page_params["skip_subcategories"] = request.meta.get(
"final_navigation_page", page_params.get("skip_subcategories")
)
def process_start_requests(
self, start_requests: List[Request], spider: Spider
) -> Iterable[Request]:
for request in super().process_start_requests(start_requests, spider):
# We treat the initial response as having a navigation_depth of 1.
self._update_request_with_navigation(request, navigation_depth=1)
self.stats.inc_value("navigation_depth/inits")
yield request
def process_spider_output(
self, response, result, spider
) -> Iterable[Union[Request, Item]]:
for item_or_request in super().process_spider_output(response, result, spider):
if not isinstance(item_or_request, Request):
yield item_or_request
continue
if req := self._process_navigation_depth(item_or_request, response):
yield req
async def process_spider_output_async(
self, response, result, spider
) -> AsyncIterable[Union[Request, Item]]:
async for item_or_request in super().process_spider_output_async(
response, result, spider
):
if not isinstance(item_or_request, Request):
yield item_or_request
continue
if req := self._process_navigation_depth(item_or_request, response):
yield req
def _update_request_with_navigation(self, request, navigation_depth):
if navigation_depth is None:
return
request.meta["navigation_depth"] = navigation_depth
request.meta["final_navigation_page"] = (
navigation_depth >= self.max_navigation_depth
)
def _current_navigation_depth(
self, increase_navigation_depth, current_navigation_depth
):
if increase_navigation_depth and current_navigation_depth is None:
current_navigation_depth = 1
return current_navigation_depth
def _process_navigation_depth(self, request, response) -> Optional[Request]:
increase_navigation_depth = request.meta.get("increase_navigation_depth", True)
current_navigation_depth = self._current_navigation_depth(
increase_navigation_depth, response.meta.get("navigation_depth")
)
if not increase_navigation_depth:
self._update_request_with_navigation(request, current_navigation_depth)
self.stats.inc_value("navigation_depth/not_counted")
return request
self.stats.inc_value(f"navigation_depth/count/{current_navigation_depth}")
self.stats.max_value("navigation_depth/max_seen", current_navigation_depth)
self._update_request_with_navigation(request, current_navigation_depth + 1)
return request
[docs]
class OnlyFeedsMiddleware(PageParamsMiddlewareBase):
"""
This middleware helps control whether the spider should discover all links on the webpage
or extract links from RSS/Atom feeds only.
"""
def __init__(self, crawler: Crawler):
super().__init__(crawler)
assert crawler.spider
if not crawler.spider.settings.getbool("ONLY_FEEDS_ENABLED"): # type: ignore[union-attr]
raise NotConfigured
def update_page_params(self, request, page_params) -> None:
page_params["only_feeds"] = request.meta.get(
"only_feeds", page_params.get("only_feeds", True)
)
[docs]
class OffsiteRequestsPerSeedMiddleware:
"""This middleware ensures that subsequent requests for each seed do not go outside
the original seed's domain.
However, offsite requests are allowed only if it came from the original domain. Any
other offsite requests that follow from offsite responses will not be allowed. This
behavior allows to crawl articles from news aggregator websites while ensuring it
doesn't fully crawl other domains it discover.
Disabling the middleware would not prevent offsite requests from being filtered
and might generally lead in other domains from being crawled completely, unless
``allowed_domains`` is set in the spider.
This middleware relies on :class:`~zyte_spider_templates.TrackSeedsSpiderMiddleware`
to set the `"seed"` and `"is_seed_request"` values in
:attr:`Request.meta <scrapy.http.Request.meta>`. Ensure that such middleware is
active and sets the said values before this middleware processes the spiders outputs.
.. note::
If a seed URL gets redirected to a different domain, both the domain from
the original request and the domain from the redirected response will be
used as references.
If the seed URL is `https://books.toscrape.com`, all subsequent requests to
`books.toscrape.com` and its subdomains are allowed, but requests to
`toscrape.com` are not. Conversely, if the seed URL is `https://toscrape.com`,
requests to both `toscrape.com` and `books.toscrape.com` are allowed.
"""
def __init__(self, crawler: Crawler):
assert crawler.spider
if not crawler.spider.settings.getbool( # type: ignore[union-attr]
"OFFSITE_REQUESTS_PER_SEED_ENABLED", True
):
raise NotConfigured
self.stats = crawler.stats
self.allowed_domains_per_seed: Dict[str, Set[str]] = defaultdict(set)
self.domains_seen: Set[str] = set()
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_spider_output(
self,
response: Response,
result: Iterable[Union[Request, Item]],
spider: Spider,
) -> Iterable[Union[Request, Item]]:
self._fill_allowed_domains_per_seed_dict(response)
for item_or_request in result:
if not isinstance(item_or_request, Request):
yield item_or_request
continue
if self.allow_request(item_or_request, response):
yield item_or_request
async def process_spider_output_async(
self,
response: Response,
result: AsyncIterable[Union[Request, Item]],
spider: Spider,
) -> AsyncIterable[Union[Request, Item]]:
self._fill_allowed_domains_per_seed_dict(response)
async for item_or_request in result:
if not isinstance(item_or_request, Request):
yield item_or_request
continue
if self.allow_request(item_or_request, response):
yield item_or_request
def allow_request(self, request: Request, response: Response) -> bool:
if request.dont_filter:
return True
if self._is_domain_per_seed_allowed(request):
return True
elif self._is_domain_per_seed_allowed(response):
# At this point, we know that the request points to an offsite page.
# We don't want to immediately filter it as it might be an article from news
# aggregator websites. So, we simply check if the request came from the
# original website. Otherwise, it came from offsite pages and we avoid it.
return True
domain = urlparse_cached(request).hostname
assert self.stats
if domain and domain not in self.domains_seen:
self.domains_seen.add(domain)
self.stats.inc_value("offsite_requests_per_seed/domains")
self.stats.inc_value("offsite_requests_per_seed/filtered")
logger.debug(f"Filtered offsite request per seed to {domain}: {request}")
return False
def _fill_allowed_domains_per_seed_dict(self, response: Response) -> None:
seed = response.meta.get("seed")
if seed is None:
return
if not response.meta.get("is_seed_request"):
if domains_for_update := response.meta.get("seed_domains"):
self.allowed_domains_per_seed[seed].update(domains_for_update)
return
domains_for_update = response.meta.get(
"seed_domains", self._get_allowed_domains(response)
)
self.allowed_domains_per_seed[seed].update(domains_for_update)
def _is_domain_per_seed_allowed(
self, req_or_resp: Union[Request, Response]
) -> bool:
seed = req_or_resp.meta.get("seed")
if seed is None:
return True
if allowed_domains := self.allowed_domains_per_seed.get(seed):
return url_is_from_any_domain(req_or_resp.url, allowed_domains)
return False
def _get_allowed_domains(self, response: Response) -> Set[str]:
"""
Returns the domains based on the URL attributes of items from a response and the originating request.
In cases where the original request URL was redirected to a new domain,
the new domain would be included as well.
"""
def get_item_and_request_urls() -> Generator[str, None, None]:
"""Since the redirected URL and canonicalUrl are only in the Item,
we try to extract it from the first item encountered."""
for _, maybe_item in response.cb_kwargs.items():
if isinstance(maybe_item, DynamicDeps):
for item_class in [Article, ArticleNavigation]:
if item := maybe_item.get(item_class):
for url_type in ("canonicalUrl", "url"):
if url := getattr(item, url_type, None):
yield url
break
else:
logger.debug(
f"This type of item: {type(maybe_item)} is not allowed"
)
assert response.request
yield response.request.url
return {get_domain(url) for url in get_item_and_request_urls()}
class DummyDupeFilter(RFPDupeFilter):
"""
This class overrides the `request_seen` method to return `False` for all requests,
disabling Scrapy's built-in duplicate filtering. Instead, deduplication
is performed in `DupeFilterDownloaderMiddleware` before requests are passed to other
middlewares.
"""
def request_seen(self, request: Request) -> bool:
return False
class DupeFilterSpiderMiddleware:
"""
This middleware uses a custom duplicate filter to override Scrapy's default filtering,
leveraging the `DummyDupeFilter` to bypass global deduplication. Instead,
deduplication is managed within the middleware itself, filtering out duplicate requests
before they reach other middlewares.
"""
dupe_filter: RFPDupeFilter = RFPDupeFilter()
def __init__(self, crawler):
self.crawler = crawler
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_start_requests(
self, start_requests: List[Request], spider: Spider
) -> Iterable[Request]:
for request in start_requests:
if not self.url_already_seen(None, request):
yield request
def process_spider_output(
self, response, result, spider
) -> Iterable[Union[Request, Item]]:
for item_or_request in result:
if isinstance(item_or_request, Request):
if not self.url_already_seen(response, item_or_request):
yield item_or_request
else:
yield item_or_request
async def process_spider_output_async(
self, response, result, spider
) -> AsyncIterable[Union[Request, Item]]:
async for item_or_request in result:
if isinstance(item_or_request, Request):
if not self.url_already_seen(response, item_or_request):
yield item_or_request
else:
yield item_or_request
def url_already_seen(self, response: Optional[Response], request: Request) -> bool:
"""A custom replacement for the default duplicate filtering, tracking URLs seen in this run."""
if not request.dont_filter and self.dupe_filter.request_seen(request):
logger.debug(
f"URL is duplicated {request.url}, for the response {response.url if response else 'start_request'}."
)
self.crawler.stats.inc_value("dupe_filter_spider_mw/url_already_seen")
return True
return False