common: black format
This commit is contained in:
parent
4abebfddab
commit
134828ea87
|
@ -2,6 +2,6 @@
|
|||
|
||||
from .config import Config, SiteConfig
|
||||
from .sources import get_page_stylesheets
|
||||
from .utils import report_progress
|
||||
from .utils import report_progress
|
||||
|
||||
__all__ = ["Config", "SiteConfig", "report_progress", "get_page_stylesheets"]
|
||||
|
|
|
@ -3,12 +3,14 @@ from contextlib import asynccontextmanager
|
|||
from typing import AsyncGenerator
|
||||
|
||||
from playwright.async_api import BrowserContext, Route, async_playwright
|
||||
|
||||
from frontools.sources import Source
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_cached_browser(source: Source) -> AsyncGenerator[BrowserContext, None]:
|
||||
"""Return a Playwright browser that will eventually get files from local cache"""
|
||||
|
||||
async def _cache_route(route: Route) -> None:
|
||||
content = await source.get_url(route.request.url)
|
||||
await route.fulfill(body=content)
|
||||
|
|
|
@ -2,10 +2,10 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from pickle import dumps, loads
|
||||
from shutil import rmtree
|
||||
from typing import Awaitable, Callable, Generic, TypeVar, Union, cast
|
||||
|
||||
from click import echo
|
||||
from shutil import rmtree
|
||||
from xdg import xdg_cache_home
|
||||
|
||||
ResourceType = TypeVar("ResourceType")
|
||||
|
@ -39,9 +39,9 @@ class Cache(Generic[ResourceType], ABC):
|
|||
for cache_name in cache_names:
|
||||
cache_path: Path = Cache.cache_base / cache_name
|
||||
if not cache_path.is_dir():
|
||||
echo(f'{cache_path} isn\'t a chache directory', err=True)
|
||||
echo(f"{cache_path} isn't a chache directory", err=True)
|
||||
continue
|
||||
echo(f'Removing {cache_path}')
|
||||
echo(f"Removing {cache_path}")
|
||||
rmtree(cache_path)
|
||||
|
||||
@staticmethod
|
||||
|
@ -75,7 +75,9 @@ class FileCache(Cache[ResourceType]):
|
|||
self._name = name
|
||||
|
||||
async def get(
|
||||
self, key: str, fallback: CacheFallback[ResourceType]
|
||||
self,
|
||||
key: str,
|
||||
fallback: CacheFallback[ResourceType],
|
||||
) -> ResourceType:
|
||||
"""Get an item in the cache, call fallback if it's not present"""
|
||||
cache_file_path = self._get_cache_file_path(key)
|
||||
|
@ -100,7 +102,7 @@ class FileCache(Cache[ResourceType]):
|
|||
key_slug = _get_key_slug(key)
|
||||
cache_directory = self.cache_base / self._name
|
||||
file_path = cache_directory.joinpath(*key_slug.split("&"))
|
||||
file_path = file_path.parent / (file_path.name[:255] + '_')
|
||||
file_path = file_path.parent / (file_path.name[:254] + "_")
|
||||
file_directory = file_path.parent
|
||||
|
||||
if not file_directory.is_dir():
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
from gettext import gettext as _
|
||||
from importlib.util import module_from_spec, spec_from_file_location
|
||||
from pathlib import Path
|
||||
from re import Pattern
|
||||
from re import compile as re_compile
|
||||
from typing import Iterable, Optional
|
||||
|
||||
|
@ -40,6 +41,7 @@ class Config:
|
|||
self._use_cache = use_cache
|
||||
self._sources: dict[str, Source] = {}
|
||||
self._sites: dict[str, SiteConfig] = {}
|
||||
self._block_urls: list[Pattern[str]] = []
|
||||
|
||||
if default_source_name is None:
|
||||
self._default_source_name = REMOTE_SOURCE_NAME
|
||||
|
@ -49,7 +51,8 @@ class Config:
|
|||
self._error_summary = ErrorSummary()
|
||||
remote_cache = self.get_data_cache(REMOTE_SOURCE_NAME)
|
||||
self._add_source(
|
||||
REMOTE_SOURCE_NAME, CachedSource(self._error_summary, remote_cache)
|
||||
REMOTE_SOURCE_NAME,
|
||||
CachedSource(self._error_summary, self._block_urls, remote_cache),
|
||||
)
|
||||
self._include_urls = [re_compile(it) for it in include_urls]
|
||||
self._exclude_urls = [re_compile(it) for it in exclude_urls]
|
||||
|
@ -123,6 +126,11 @@ class Config:
|
|||
|
||||
self._sites[name].urls.append(url)
|
||||
|
||||
def block_url_patterns(self, *patterns: str) -> None:
|
||||
"""Will return 500 error for urls matching this pattern."""
|
||||
for pattern in patterns:
|
||||
self._block_urls.append(re_compile(pattern))
|
||||
|
||||
def get_data_cache(self, name: str) -> Cache[bytes]:
|
||||
"""Get a data cache with the given identifier"""
|
||||
if self._use_cache:
|
||||
|
@ -149,7 +157,9 @@ class Config:
|
|||
next_source = self.default_source
|
||||
else:
|
||||
next_source = self.get_source(next_source_name)
|
||||
self._sources[name] = OverrideSource(self._error_summary, mappings, next_source)
|
||||
self._sources[name] = OverrideSource(
|
||||
self._error_summary, self._block_urls, mappings, next_source
|
||||
)
|
||||
|
||||
def get_source(self, name: str) -> Source:
|
||||
"""Get an alternate source in the configured ones"""
|
||||
|
|
|
@ -1,13 +1,18 @@
|
|||
"""Css related functions"""
|
||||
from enum import Enum
|
||||
from typing import Iterable
|
||||
from typing import AsyncIterable, Iterator
|
||||
from typing import AsyncIterable, Iterable, Iterator
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from tinycss2 import parse_stylesheet
|
||||
from tinycss2.ast import (
|
||||
IdentToken,
|
||||
LiteralToken,
|
||||
Node,
|
||||
SquareBracketsBlock,
|
||||
WhitespaceToken,
|
||||
)
|
||||
from tinycss2.parser import QualifiedRule
|
||||
from tinycss2.ast import WhitespaceToken, LiteralToken, IdentToken, Node, SquareBracketsBlock
|
||||
|
||||
from frontools.sources import Source
|
||||
|
||||
|
@ -19,8 +24,12 @@ async def css_diff(url: str, left_source: Source, right_source: Source) -> None:
|
|||
if css_url is None:
|
||||
continue
|
||||
right_stylesheet_content = await right_source.get_url(urljoin(url, css_url))
|
||||
left_sheet = parse_stylesheet(left_stylesheet_content.decode('utf-8'), True, True)
|
||||
right_sheet = parse_stylesheet(right_stylesheet_content.decode('utf-8'), True, True)
|
||||
left_sheet = parse_stylesheet(
|
||||
left_stylesheet_content.decode("utf-8"), True, True
|
||||
)
|
||||
right_sheet = parse_stylesheet(
|
||||
right_stylesheet_content.decode("utf-8"), True, True
|
||||
)
|
||||
|
||||
left_selector_index = _get_selector_index(left_sheet)
|
||||
right_selector_index = _get_selector_index(right_sheet)
|
||||
|
@ -49,7 +58,7 @@ class Combinator(Enum):
|
|||
class Selector:
|
||||
def __init__(self) -> None:
|
||||
self._classes: set[str] = set()
|
||||
self._combined = dict[Combinator, 'Selector']
|
||||
self._combined = dict[Combinator, "Selector"]
|
||||
|
||||
|
||||
def _parse_selector(token_iterator: Iterator[Node]) -> Selector:
|
||||
|
@ -57,26 +66,26 @@ def _parse_selector(token_iterator: Iterator[Node]) -> Selector:
|
|||
classes: set[str] = set()
|
||||
child_selectors: dict[Combinator, Selector] = {}
|
||||
token = next(token_iterator)
|
||||
tag = ''
|
||||
tag = ""
|
||||
attributes: dict[str, str] = {}
|
||||
while True:
|
||||
while(isinstance(token, WhitespaceToken)):
|
||||
while isinstance(token, WhitespaceToken):
|
||||
token = next(token_iterator)
|
||||
if isinstance(token, LiteralToken):
|
||||
if token.value == '.':
|
||||
if token.value == ".":
|
||||
while True:
|
||||
token = next(token_iterator)
|
||||
assert isinstance(token, IdentToken)
|
||||
classes.add(token.value)
|
||||
token = next(token_iterator)
|
||||
|
||||
if not isinstance(token, LiteralToken) or token.value != '.':
|
||||
if not isinstance(token, LiteralToken) or token.value != ".":
|
||||
break
|
||||
else:
|
||||
combinator_mappings = {
|
||||
'+': Combinator.ADJACENT_SIBLING,
|
||||
'>': Combinator.DIRECT_CHILD,
|
||||
'~': Combinator.GENERAL_SIBLING
|
||||
"+": Combinator.ADJACENT_SIBLING,
|
||||
">": Combinator.DIRECT_CHILD,
|
||||
"~": Combinator.GENERAL_SIBLING,
|
||||
}
|
||||
if token.value in combinator_mappings:
|
||||
combinator = combinator_mappings[token.value]
|
||||
|
|
|
@ -29,7 +29,7 @@ class Browser:
|
|||
self._browser_context = browser_context
|
||||
|
||||
@asynccontextmanager
|
||||
async def load_page(self, url: str) -> AsyncGenerator[Page, None]:
|
||||
async def load_page(self, url: str) -> AsyncGenerator[Optional[Page], None]:
|
||||
page = await self._browser_context.new_page()
|
||||
await page.route("*", self._source.route)
|
||||
for retry in range(0, 3):
|
||||
|
@ -53,8 +53,11 @@ class Browser:
|
|||
class Source(ABC):
|
||||
"""Base class for sources"""
|
||||
|
||||
def __init__(self, error_summary: ErrorSummary) -> None:
|
||||
def __init__(
|
||||
self, error_summary: ErrorSummary, block_urls: list[Pattern[str]]
|
||||
) -> None:
|
||||
self._error_summary = error_summary
|
||||
self._block_urls = block_urls
|
||||
|
||||
@abstractmethod
|
||||
async def get_url(self, url: str) -> bytes:
|
||||
|
@ -86,15 +89,24 @@ class Source(ABC):
|
|||
await browser.close()
|
||||
|
||||
async def route(self, route: Route) -> None:
|
||||
content = await self.get_url(route.request.url)
|
||||
await route.fulfill(body=content, status=200)
|
||||
url = route.request.url
|
||||
if any([pattern.match(url) for pattern in self._block_urls]):
|
||||
await route.fulfill(status=500)
|
||||
else:
|
||||
content = await self.get_url(url)
|
||||
await route.fulfill(body=content, status=200)
|
||||
|
||||
|
||||
class CachedSource(Source):
|
||||
"""Source loading urls from the internet."""
|
||||
|
||||
def __init__(self, error_summary: ErrorSummary, cache: Cache[bytes]) -> None:
|
||||
super().__init__(error_summary)
|
||||
def __init__(
|
||||
self,
|
||||
error_summary: ErrorSummary,
|
||||
block_urls: list[Pattern[str]],
|
||||
cache: Cache[bytes],
|
||||
) -> None:
|
||||
super().__init__(error_summary, block_urls)
|
||||
self._cache = cache
|
||||
|
||||
async def get_url(self, url: str) -> bytes:
|
||||
|
@ -118,10 +130,11 @@ class OverrideSource(Source):
|
|||
def __init__(
|
||||
self,
|
||||
error_summary: ErrorSummary,
|
||||
block_urls: list[Pattern[str]],
|
||||
mappings: list[tuple[str, str]],
|
||||
next_source: Source,
|
||||
):
|
||||
super().__init__(error_summary)
|
||||
super().__init__(error_summary, block_urls)
|
||||
self._mappings: list[tuple[Pattern[str], str]] = []
|
||||
self._next_source = next_source
|
||||
|
||||
|
|
Loading…
Reference in New Issue