Source code for city_scrapers_core.spiders.legistar

from collections import defaultdict
from datetime import datetime
from typing import Dict, Iterable, List, Union
from urllib.parse import parse_qs, urlencode

import scrapy

from ..items import Meeting
from .spider import CityScrapersSpider

LINK_TYPES = ["Agenda", "Minutes", "Video", "Summary", "Captions"]


[docs]class LegistarSpider(CityScrapersSpider): """Subclass of :class:`CityScrapersSpider` that handles processing Legistar sites, which almost always share the same components and general structure. Any methods that don't pull the correct values can be replaced. """ # noqa link_types = [] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Can override since_year to start earlier self.since_year = datetime.now().year - 1 self._scraped_urls = set()
[docs] def parse(self, response: scrapy.http.Response) -> Iterable[scrapy.Request]: """Creates initial event requests for each queried year. :param response: Scrapy response to be ignored :return: Iterable of ``Request`` objects for event pages """ secrets = self._parse_secrets(response) current_year = datetime.now().year for year in range(self.since_year, current_year + 1): yield scrapy.Request( response.url, method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, body=urlencode( { **secrets, "__EVENTTARGET": "ctl00$ContentPlaceHolder1$lstYears", "ctl00_ContentPlaceHolder1_lstYears_ClientState": f'{{"value":"{year}"}}', # noqa } ), callback=self._parse_legistar_events_page, dont_filter=True, )
[docs] def parse_legistar(self, events: Iterable[Dict]) -> Iterable[Meeting]: """Method to be implemented by Spider classes that will handle the response from Legistar. Functions similar to ``parse`` for other Spider classes. :param events: Iterable consisting of a dict of scraped results from Legistar :raises NotImplementedError: Must be implemented in subclasses :return: ``Meeting`` objects that will be passed to pipelines, output """ raise NotImplementedError("Must implement parse_legistar")
[docs] def legistar_start(self, item: Dict) -> datetime: """Pulls the start time from a Legistar item :param item: Scraped item from Legistar :return: Meeting start datetime """ start_date = item.get("Meeting Date") start_time = item.get("Meeting Time") if start_date and start_time: try: return datetime.strptime( f"{start_date} {start_time}", "%m/%d/%Y %I:%M %p" ) except ValueError: return datetime.strptime(start_date, "%m/%d/%Y")
[docs] def legistar_source(self, item: Dict) -> str: """Pulls the source URL from a Legistar item. Pulls a specific meeting URL if available, otherwise defaults to the general Legistar calendar page. :param item: Scraped item from Legistar :return: Source URL """ default_url = self.start_urls[0] if isinstance(item.get("Name"), dict): return item["Name"].get("url", default_url) if isinstance(item.get("Meeting Details"), dict): return item["Meeting Details"].get("url", default_url) return default_url
def _parse_legistar_events_page( self, response: scrapy.http.Response ) -> Iterable[Union[Meeting, scrapy.http.Request]]: legistar_events = self._parse_legistar_events(response) yield from self.parse_legistar(legistar_events) yield from self._parse_next_page(response) def _parse_legistar_events(self, response: scrapy.http.Response) -> Iterable[Dict]: events_table = response.css("table.rgMasterTable")[0] headers = [] for header in events_table.css("th[class^='rgHeader']"): header_text = ( " ".join(header.css("*::text").extract()).replace(" ", " ").strip() ) header_inputs = header.css("input") if header_text: headers.append(header_text) elif len(header_inputs) > 0: headers.append(header_inputs[0].attrib["value"]) else: headers.append(header.css("img")[0].attrib["alt"]) events = [] for row in events_table.css("tr.rgRow, tr.rgAltRow"): try: data = defaultdict(lambda: None) for header, field in zip(headers, row.css("td")): field_text = ( " ".join(field.css("*::text").extract()) .replace(" ", " ") .strip() ) url = None if len(field.css("a")) > 0: link_el = field.css("a")[0] if "onclick" in link_el.attrib and link_el.attrib[ "onclick" ].startswith(("radopen('", "window.open", "OpenTelerikWindow")): url = response.urljoin( link_el.attrib["onclick"].split("'")[1] ) elif "href" in link_el.attrib: url = response.urljoin(link_el.attrib["href"]) if url: if header in ["", "ics"] and "View.ashx?M=IC" in url: header = "iCalendar" value = {"url": url} else: value = {"label": field_text, "url": url} else: value = field_text data[header] = value ical_url = data.get("iCalendar", {}).get("url") if ical_url is None or ical_url in self._scraped_urls: continue else: self._scraped_urls.add(ical_url) events.append(dict(data)) except Exception: pass return events def _parse_next_page( self, response: scrapy.http.Response ) -> Iterable[scrapy.Request]: next_page_link = response.css("a.rgCurrentPage + a") if len(next_page_link) == 0: return event_target = next_page_link[0].attrib["href"].split("'")[1] next_page_payload = { **parse_qs(response.request.body.decode("utf-8")), **self._parse_secrets(response), "__EVENTTARGET": event_target, } yield scrapy.Request( response.url, method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, body=urlencode(next_page_payload), callback=self._parse_legistar_events_page, dont_filter=True, ) def _parse_secrets(self, response: scrapy.http.Response) -> Dict: secrets = { "__EVENTARGUMENT": None, "__VIEWSTATE": response.css("[name='__VIEWSTATE']")[0].attrib["value"], } event_validation = response.css("[name='__EVENTVALIDATION']") if len(event_validation) > 0: secrets["__EVENTVALIDATION"] = event_validation[0].attrib["value"] return secrets