import json
from datetime import datetime, timedelta
from operator import attrgetter, itemgetter
from typing import List, Mapping
from urllib.parse import urlparse
from pytz import timezone
from scrapy import Spider, signals
from scrapy.crawler import Crawler
from scrapy.exceptions import DontCloseSpider, DropItem
from scrapy.http import Response
from ..constants import CANCELLED
from ..items import Meeting
[docs]class DiffPipeline:
"""Class for loading and comparing previous feed export results in OCD format.
Either merges UIDs for consistency or marks upcoming meetings that no longer
appear as cancelled.
Provider-specific backends can be created by subclassing and implementing the
`load_previous_results` method.
"""
def __init__(self, crawler: Crawler, output_format: str):
"""Initialize a DiffPipeline object, setting the crawler and output format
:param crawler: Current Crawler object
:param output_format: Currently only "ocd" is supported
"""
self.crawler = crawler
self.output_format = output_format
[docs] @classmethod
def from_crawler(cls, crawler: Crawler):
"""Classmethod for creating a pipeline object from a Crawler
:param crawler: Crawler currently being run
:raises ValueError: Raises an error if an output format is not supplied
:return: Instance of DiffPipeline
"""
pipelines = crawler.settings.get("ITEM_PIPELINES", {})
if "city_scrapers_core.pipelines.OpenCivicDataPipeline" in pipelines:
output_format = "ocd"
else:
raise ValueError(
"An output format pipeline must be enabled for diff middleware"
)
pipeline = cls(crawler, output_format)
crawler.spider._previous_results = pipeline.load_previous_results()
if output_format == "ocd":
crawler.spider._previous_map = {}
for result in crawler.spider._previous_results:
extras_dict = result.get("extras") or result.get("extra") or {}
previous_id = extras_dict.get("cityscrapers.org/id")
crawler.spider._previous_map[previous_id] = result["_id"]
crawler.spider._scraped_ids = set()
crawler.signals.connect(pipeline.spider_idle, signal=signals.spider_idle)
return pipeline
[docs] def process_item(self, item: Mapping, spider: Spider) -> Mapping:
"""Processes Item objects or general dict-like objects and compares them to
previously scraped values.
:param item: Dict-like item to process from a scraper
:param spider: Spider currently being scraped
:raises DropItem: Drops items with IDs that have been already scraped
:raises DropItem: Drops items that are in the past and already scraped
:return: Returns the item, merged with previous values if found
"""
# Merge uid if this is a current item
id_key = "_id"
if isinstance(item, Meeting) or (isinstance(item, dict) and id_key not in item):
if item["id"] in spider._scraped_ids:
raise DropItem("Item has already been scraped")
spider._scraped_ids.add(item["id"])
if item["id"] in spider._previous_map:
# Bypass __setitem__ call on Meeting to add uid
if isinstance(item, Meeting):
item._values[id_key] = spider._previous_map[item["id"]]
else:
item[id_key] = spider._previous_map[item["id"]]
return item
if self.output_format == "ocd":
extras_dict = item.get("extras") or item.get("extra") or {}
scraper_id = extras_dict.get("cityscrapers.org/id", "")
# Drop items that are already included or are in the past
dt_str = datetime.now().isoformat()[:19]
if (
scraper_id in spider._scraped_ids
or item.get("start", item.get("start_time")) < dt_str
):
raise DropItem("Previous item is in scraped results or the past")
# # If the item is upcoming and not scraped, mark it cancelled
spider._scraped_ids.add(scraper_id)
return {**item, "status": CANCELLED}
[docs] def spider_idle(self, spider: Spider):
"""Add _previous_results to spider queue when current results finish
:param spider: Spider being scraped
:raises DontCloseSpider: Makes sure spider isn't closed to make sure prior
results are processed
"""
scraper = self.crawler.engine.scraper
self.crawler.signals.disconnect(self.spider_idle, signal=signals.spider_idle)
for item in spider._previous_results:
scraper._process_spidermw_output(item, None, Response(""), spider)
raise DontCloseSpider
[docs] def load_previous_results(self) -> List[Mapping]:
"""Method that must be implemented for loading previously-scraped results
:raises NotImplementedError: Required to be implemented on subclasses
:return: Items previously scraped and loaded from a storage backend
"""
raise NotImplementedError
[docs]class AzureDiffPipeline(DiffPipeline):
"""Implements :class:`DiffPipeline` for Azure Blob Storage"""
def __init__(self, crawler: Crawler, output_format: str):
"""Initialize :class:`AzureDiffPipeline` from a crawler and set account values
:param crawler: Current Crawler object
:param output_format: Currently only "ocd" is supported
"""
from azure.storage.blob import ContainerClient
feed_uri = crawler.settings.get("FEED_URI")
account_name, account_key = feed_uri[8::].split("@")[0].split(":")
self.spider = crawler.spider
self.container = feed_uri.split("@")[1].split("/")[0]
self.container_client = ContainerClient(
f"{account_name}.blob.core.windows.net",
self.container,
credential=account_key,
)
self.feed_prefix = crawler.settings.get(
"CITY_SCRAPERS_DIFF_FEED_PREFIX", "%Y/%m/%d"
)
super().__init__(crawler, output_format)
[docs] def load_previous_results(self) -> List[Mapping]:
"""Loads previously scraped items on Azure Blob Storage
:return: Previously scraped results
"""
max_days_previous = 3
days_previous = 0
tz = timezone(self.spider.timezone)
while days_previous <= max_days_previous:
matching_blobs = self.container_client.list_blobs(
name_starts_with=(
tz.localize(datetime.now()) - timedelta(days=days_previous)
).strftime(self.feed_prefix)
)
spider_blobs = [
blob for blob in matching_blobs if f"{self.spider.name}." in blob.name
]
if len(spider_blobs) > 0:
break
days_previous += 1
if len(spider_blobs) == 0:
return []
blob = sorted(spider_blobs, key=attrgetter("name"))[-1]
feed_blob = self.container_client.get_blob_client(blob.name)
feed_text = feed_blob.download_blob().content_as_text()
return [json.loads(line) for line in feed_text.split("\n") if line.strip()]
[docs]class S3DiffPipeline(DiffPipeline):
"""Implements :class:`DiffPipeline` for AWS S3"""
def __init__(self, crawler: Crawler, output_format: str):
"""Initialize :class:`S3DiffPipeline` from crawler
:param crawler: Current Crawler object
:param output_format: Only "ocd" is supported
"""
import boto3
parsed = urlparse(crawler.settings.get("FEED_URI"))
self.spider = crawler.spider
self.feed_prefix = crawler.settings.get(
"CITY_SCRAPERS_DIFF_FEED_PREFIX", "%Y/%m/%d"
)
self.bucket = parsed.netloc
self.client = boto3.client(
"s3",
aws_access_key_id=crawler.settings.get("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=crawler.settings.get("AWS_SECRET_ACCESS_KEY"),
)
super().__init__(crawler, output_format)
[docs] def load_previous_results(self) -> List[Mapping]:
"""Load previously scraped items on AWS S3
:return: Previously scraped results
"""
max_days_previous = 3
days_previous = 0
tz = timezone(self.spider.timezone)
while days_previous <= max_days_previous:
match_objects = self.client.list_objects(
Bucket=self.bucket,
Prefix=(
tz.localize(datetime.now()) - timedelta(days=days_previous)
).strftime(self.feed_prefix),
MaxKeys=1000,
)
spider_objects = [
obj
for obj in match_objects.get("Contents", [])
if f"{self.spider.name}." in obj["Key"]
]
if len(spider_objects) > 0:
break
days_previous += 1
if len(spider_objects) == 0:
return []
obj = sorted(spider_objects, key=itemgetter("Key"))[-1]
feed_text = (
self.client.get_object(Bucket=self.bucket, Key=obj["Key"])
.get("Body")
.read()
.decode("utf-8")
)
return [json.loads(line) for line in feed_text.split("\n") if line.strip()]
[docs]class GCSDiffPipeline(DiffPipeline):
"""Implements :class:`DiffPipeline` for Google Cloud Storage"""
def __init__(self, crawler: Crawler, output_format: str):
"""Initialize :class:`GCSDiffPipeline` from crawler
:param crawler: Current Crawler object
:param output_format: Only "ocd" is supported
"""
from google.cloud import storage
parsed = urlparse(crawler.settings.get("FEED_URI"))
self.spider = crawler.spider
self.feed_prefix = crawler.settings.get(
"CITY_SCRAPERS_DIFF_FEED_PREFIX", "%Y/%m/%d"
)
self.bucket_name = parsed.netloc
self.client = storage.Client()
self.bucket = self.client.bucket(self.bucket_name)
super().__init__(crawler, output_format)
[docs] def load_previous_results(self) -> List[Mapping]:
"""Load previously scraped items on Google Cloud Storage
:return: Previously scraped results
"""
max_days_previous = 3
days_previous = 0
tz = timezone(self.spider.timezone)
while days_previous <= max_days_previous:
match_blobs = self.client.list_blobs(
self.bucket_name,
prefix=(
tz.localize(datetime.now()) - timedelta(days=days_previous)
).strftime(self.feed_prefix),
)
spider_blobs = [
blob for blob in match_blobs if f"{self.spider.name}." in blob.name
]
if len(spider_blobs) > 0:
break
days_previous += 1
if len(spider_blobs) == 0:
return []
blob = sorted(spider_blobs, key=attrgetter("name"))[-1]
feed_text = self.bucket.blob(blob.name).download_as_bytes().decode("utf-8")
return [json.loads(line) for line in feed_text.split("\n") if line.strip()]