diff --git a/frontools/__init__.py b/frontools/__init__.py index 710b708..f79fa14 100644 --- a/frontools/__init__.py +++ b/frontools/__init__.py @@ -2,6 +2,6 @@ from .config import Config, SiteConfig from .sources import get_page_stylesheets -from .utils import report_progress +from .utils import report_progress __all__ = ["Config", "SiteConfig", "report_progress", "get_page_stylesheets"] diff --git a/frontools/browser.py b/frontools/browser.py index fd8021a..0f6dff5 100644 --- a/frontools/browser.py +++ b/frontools/browser.py @@ -3,12 +3,14 @@ from contextlib import asynccontextmanager from typing import AsyncGenerator from playwright.async_api import BrowserContext, Route, async_playwright + from frontools.sources import Source @asynccontextmanager async def get_cached_browser(source: Source) -> AsyncGenerator[BrowserContext, None]: """Return a Playwright browser that will eventually get files from local cache""" + async def _cache_route(route: Route) -> None: content = await source.get_url(route.request.url) await route.fulfill(body=content) diff --git a/frontools/cache.py b/frontools/cache.py index befd90c..6315ec0 100644 --- a/frontools/cache.py +++ b/frontools/cache.py @@ -2,10 +2,10 @@ from abc import ABC, abstractmethod from pathlib import Path from pickle import dumps, loads +from shutil import rmtree from typing import Awaitable, Callable, Generic, TypeVar, Union, cast from click import echo -from shutil import rmtree from xdg import xdg_cache_home ResourceType = TypeVar("ResourceType") @@ -39,9 +39,9 @@ class Cache(Generic[ResourceType], ABC): for cache_name in cache_names: cache_path: Path = Cache.cache_base / cache_name if not cache_path.is_dir(): - echo(f'{cache_path} isn\'t a chache directory', err=True) + echo(f"{cache_path} isn't a chache directory", err=True) continue - echo(f'Removing {cache_path}') + echo(f"Removing {cache_path}") rmtree(cache_path) @staticmethod @@ -75,7 +75,9 @@ class FileCache(Cache[ResourceType]): self._name = name async def get( - self, key: str, fallback: CacheFallback[ResourceType] + self, + key: str, + fallback: CacheFallback[ResourceType], ) -> ResourceType: """Get an item in the cache, call fallback if it's not present""" cache_file_path = self._get_cache_file_path(key) @@ -100,7 +102,7 @@ class FileCache(Cache[ResourceType]): key_slug = _get_key_slug(key) cache_directory = self.cache_base / self._name file_path = cache_directory.joinpath(*key_slug.split("&")) - file_path = file_path.parent / (file_path.name[:255] + '_') + file_path = file_path.parent / (file_path.name[:254] + "_") file_directory = file_path.parent if not file_directory.is_dir(): diff --git a/frontools/config.py b/frontools/config.py index 6a0b7bf..6db336c 100644 --- a/frontools/config.py +++ b/frontools/config.py @@ -2,6 +2,7 @@ from gettext import gettext as _ from importlib.util import module_from_spec, spec_from_file_location from pathlib import Path +from re import Pattern from re import compile as re_compile from typing import Iterable, Optional @@ -40,6 +41,7 @@ class Config: self._use_cache = use_cache self._sources: dict[str, Source] = {} self._sites: dict[str, SiteConfig] = {} + self._block_urls: list[Pattern[str]] = [] if default_source_name is None: self._default_source_name = REMOTE_SOURCE_NAME @@ -49,7 +51,8 @@ class Config: self._error_summary = ErrorSummary() remote_cache = self.get_data_cache(REMOTE_SOURCE_NAME) self._add_source( - REMOTE_SOURCE_NAME, CachedSource(self._error_summary, remote_cache) + REMOTE_SOURCE_NAME, + CachedSource(self._error_summary, self._block_urls, remote_cache), ) self._include_urls = [re_compile(it) for it in include_urls] self._exclude_urls = [re_compile(it) for it in exclude_urls] @@ -123,6 +126,11 @@ class Config: self._sites[name].urls.append(url) + def block_url_patterns(self, *patterns: str) -> None: + """Will return 500 error for urls matching this pattern.""" + for pattern in patterns: + self._block_urls.append(re_compile(pattern)) + def get_data_cache(self, name: str) -> Cache[bytes]: """Get a data cache with the given identifier""" if self._use_cache: @@ -149,7 +157,9 @@ class Config: next_source = self.default_source else: next_source = self.get_source(next_source_name) - self._sources[name] = OverrideSource(self._error_summary, mappings, next_source) + self._sources[name] = OverrideSource( + self._error_summary, self._block_urls, mappings, next_source + ) def get_source(self, name: str) -> Source: """Get an alternate source in the configured ones""" diff --git a/frontools/css.py b/frontools/css.py index dda437c..fe0f03e 100644 --- a/frontools/css.py +++ b/frontools/css.py @@ -1,13 +1,18 @@ """Css related functions""" from enum import Enum -from typing import Iterable -from typing import AsyncIterable, Iterator +from typing import AsyncIterable, Iterable, Iterator from urllib.parse import urljoin from bs4 import BeautifulSoup from tinycss2 import parse_stylesheet +from tinycss2.ast import ( + IdentToken, + LiteralToken, + Node, + SquareBracketsBlock, + WhitespaceToken, +) from tinycss2.parser import QualifiedRule -from tinycss2.ast import WhitespaceToken, LiteralToken, IdentToken, Node, SquareBracketsBlock from frontools.sources import Source @@ -19,8 +24,12 @@ async def css_diff(url: str, left_source: Source, right_source: Source) -> None: if css_url is None: continue right_stylesheet_content = await right_source.get_url(urljoin(url, css_url)) - left_sheet = parse_stylesheet(left_stylesheet_content.decode('utf-8'), True, True) - right_sheet = parse_stylesheet(right_stylesheet_content.decode('utf-8'), True, True) + left_sheet = parse_stylesheet( + left_stylesheet_content.decode("utf-8"), True, True + ) + right_sheet = parse_stylesheet( + right_stylesheet_content.decode("utf-8"), True, True + ) left_selector_index = _get_selector_index(left_sheet) right_selector_index = _get_selector_index(right_sheet) @@ -49,7 +58,7 @@ class Combinator(Enum): class Selector: def __init__(self) -> None: self._classes: set[str] = set() - self._combined = dict[Combinator, 'Selector'] + self._combined = dict[Combinator, "Selector"] def _parse_selector(token_iterator: Iterator[Node]) -> Selector: @@ -57,26 +66,26 @@ def _parse_selector(token_iterator: Iterator[Node]) -> Selector: classes: set[str] = set() child_selectors: dict[Combinator, Selector] = {} token = next(token_iterator) - tag = '' + tag = "" attributes: dict[str, str] = {} while True: - while(isinstance(token, WhitespaceToken)): + while isinstance(token, WhitespaceToken): token = next(token_iterator) if isinstance(token, LiteralToken): - if token.value == '.': + if token.value == ".": while True: token = next(token_iterator) assert isinstance(token, IdentToken) classes.add(token.value) token = next(token_iterator) - if not isinstance(token, LiteralToken) or token.value != '.': + if not isinstance(token, LiteralToken) or token.value != ".": break else: combinator_mappings = { - '+': Combinator.ADJACENT_SIBLING, - '>': Combinator.DIRECT_CHILD, - '~': Combinator.GENERAL_SIBLING + "+": Combinator.ADJACENT_SIBLING, + ">": Combinator.DIRECT_CHILD, + "~": Combinator.GENERAL_SIBLING, } if token.value in combinator_mappings: combinator = combinator_mappings[token.value] diff --git a/frontools/sources.py b/frontools/sources.py index 0e948d4..2f3cda6 100644 --- a/frontools/sources.py +++ b/frontools/sources.py @@ -29,7 +29,7 @@ class Browser: self._browser_context = browser_context @asynccontextmanager - async def load_page(self, url: str) -> AsyncGenerator[Page, None]: + async def load_page(self, url: str) -> AsyncGenerator[Optional[Page], None]: page = await self._browser_context.new_page() await page.route("*", self._source.route) for retry in range(0, 3): @@ -53,8 +53,11 @@ class Browser: class Source(ABC): """Base class for sources""" - def __init__(self, error_summary: ErrorSummary) -> None: + def __init__( + self, error_summary: ErrorSummary, block_urls: list[Pattern[str]] + ) -> None: self._error_summary = error_summary + self._block_urls = block_urls @abstractmethod async def get_url(self, url: str) -> bytes: @@ -86,15 +89,24 @@ class Source(ABC): await browser.close() async def route(self, route: Route) -> None: - content = await self.get_url(route.request.url) - await route.fulfill(body=content, status=200) + url = route.request.url + if any([pattern.match(url) for pattern in self._block_urls]): + await route.fulfill(status=500) + else: + content = await self.get_url(url) + await route.fulfill(body=content, status=200) class CachedSource(Source): """Source loading urls from the internet.""" - def __init__(self, error_summary: ErrorSummary, cache: Cache[bytes]) -> None: - super().__init__(error_summary) + def __init__( + self, + error_summary: ErrorSummary, + block_urls: list[Pattern[str]], + cache: Cache[bytes], + ) -> None: + super().__init__(error_summary, block_urls) self._cache = cache async def get_url(self, url: str) -> bytes: @@ -118,10 +130,11 @@ class OverrideSource(Source): def __init__( self, error_summary: ErrorSummary, + block_urls: list[Pattern[str]], mappings: list[tuple[str, str]], next_source: Source, ): - super().__init__(error_summary) + super().__init__(error_summary, block_urls) self._mappings: list[tuple[Pattern[str], str]] = [] self._next_source = next_source