common: black format

This commit is contained in:
Corentin Sechet 2022-04-11 16:31:19 +02:00
parent 4abebfddab
commit 134828ea87
6 changed files with 64 additions and 28 deletions

View File

@ -2,6 +2,6 @@
from .config import Config, SiteConfig
from .sources import get_page_stylesheets
from .utils import report_progress
from .utils import report_progress
__all__ = ["Config", "SiteConfig", "report_progress", "get_page_stylesheets"]

View File

@ -3,12 +3,14 @@ from contextlib import asynccontextmanager
from typing import AsyncGenerator
from playwright.async_api import BrowserContext, Route, async_playwright
from frontools.sources import Source
@asynccontextmanager
async def get_cached_browser(source: Source) -> AsyncGenerator[BrowserContext, None]:
"""Return a Playwright browser that will eventually get files from local cache"""
async def _cache_route(route: Route) -> None:
content = await source.get_url(route.request.url)
await route.fulfill(body=content)

View File

@ -2,10 +2,10 @@
from abc import ABC, abstractmethod
from pathlib import Path
from pickle import dumps, loads
from shutil import rmtree
from typing import Awaitable, Callable, Generic, TypeVar, Union, cast
from click import echo
from shutil import rmtree
from xdg import xdg_cache_home
ResourceType = TypeVar("ResourceType")
@ -39,9 +39,9 @@ class Cache(Generic[ResourceType], ABC):
for cache_name in cache_names:
cache_path: Path = Cache.cache_base / cache_name
if not cache_path.is_dir():
echo(f'{cache_path} isn\'t a chache directory', err=True)
echo(f"{cache_path} isn't a chache directory", err=True)
continue
echo(f'Removing {cache_path}')
echo(f"Removing {cache_path}")
rmtree(cache_path)
@staticmethod
@ -75,7 +75,9 @@ class FileCache(Cache[ResourceType]):
self._name = name
async def get(
self, key: str, fallback: CacheFallback[ResourceType]
self,
key: str,
fallback: CacheFallback[ResourceType],
) -> ResourceType:
"""Get an item in the cache, call fallback if it's not present"""
cache_file_path = self._get_cache_file_path(key)
@ -100,7 +102,7 @@ class FileCache(Cache[ResourceType]):
key_slug = _get_key_slug(key)
cache_directory = self.cache_base / self._name
file_path = cache_directory.joinpath(*key_slug.split("&"))
file_path = file_path.parent / (file_path.name[:255] + '_')
file_path = file_path.parent / (file_path.name[:254] + "_")
file_directory = file_path.parent
if not file_directory.is_dir():

View File

@ -2,6 +2,7 @@
from gettext import gettext as _
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
from re import Pattern
from re import compile as re_compile
from typing import Iterable, Optional
@ -40,6 +41,7 @@ class Config:
self._use_cache = use_cache
self._sources: dict[str, Source] = {}
self._sites: dict[str, SiteConfig] = {}
self._block_urls: list[Pattern[str]] = []
if default_source_name is None:
self._default_source_name = REMOTE_SOURCE_NAME
@ -49,7 +51,8 @@ class Config:
self._error_summary = ErrorSummary()
remote_cache = self.get_data_cache(REMOTE_SOURCE_NAME)
self._add_source(
REMOTE_SOURCE_NAME, CachedSource(self._error_summary, remote_cache)
REMOTE_SOURCE_NAME,
CachedSource(self._error_summary, self._block_urls, remote_cache),
)
self._include_urls = [re_compile(it) for it in include_urls]
self._exclude_urls = [re_compile(it) for it in exclude_urls]
@ -123,6 +126,11 @@ class Config:
self._sites[name].urls.append(url)
def block_url_patterns(self, *patterns: str) -> None:
"""Will return 500 error for urls matching this pattern."""
for pattern in patterns:
self._block_urls.append(re_compile(pattern))
def get_data_cache(self, name: str) -> Cache[bytes]:
"""Get a data cache with the given identifier"""
if self._use_cache:
@ -149,7 +157,9 @@ class Config:
next_source = self.default_source
else:
next_source = self.get_source(next_source_name)
self._sources[name] = OverrideSource(self._error_summary, mappings, next_source)
self._sources[name] = OverrideSource(
self._error_summary, self._block_urls, mappings, next_source
)
def get_source(self, name: str) -> Source:
"""Get an alternate source in the configured ones"""

View File

@ -1,13 +1,18 @@
"""Css related functions"""
from enum import Enum
from typing import Iterable
from typing import AsyncIterable, Iterator
from typing import AsyncIterable, Iterable, Iterator
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from tinycss2 import parse_stylesheet
from tinycss2.ast import (
IdentToken,
LiteralToken,
Node,
SquareBracketsBlock,
WhitespaceToken,
)
from tinycss2.parser import QualifiedRule
from tinycss2.ast import WhitespaceToken, LiteralToken, IdentToken, Node, SquareBracketsBlock
from frontools.sources import Source
@ -19,8 +24,12 @@ async def css_diff(url: str, left_source: Source, right_source: Source) -> None:
if css_url is None:
continue
right_stylesheet_content = await right_source.get_url(urljoin(url, css_url))
left_sheet = parse_stylesheet(left_stylesheet_content.decode('utf-8'), True, True)
right_sheet = parse_stylesheet(right_stylesheet_content.decode('utf-8'), True, True)
left_sheet = parse_stylesheet(
left_stylesheet_content.decode("utf-8"), True, True
)
right_sheet = parse_stylesheet(
right_stylesheet_content.decode("utf-8"), True, True
)
left_selector_index = _get_selector_index(left_sheet)
right_selector_index = _get_selector_index(right_sheet)
@ -49,7 +58,7 @@ class Combinator(Enum):
class Selector:
def __init__(self) -> None:
self._classes: set[str] = set()
self._combined = dict[Combinator, 'Selector']
self._combined = dict[Combinator, "Selector"]
def _parse_selector(token_iterator: Iterator[Node]) -> Selector:
@ -57,26 +66,26 @@ def _parse_selector(token_iterator: Iterator[Node]) -> Selector:
classes: set[str] = set()
child_selectors: dict[Combinator, Selector] = {}
token = next(token_iterator)
tag = ''
tag = ""
attributes: dict[str, str] = {}
while True:
while(isinstance(token, WhitespaceToken)):
while isinstance(token, WhitespaceToken):
token = next(token_iterator)
if isinstance(token, LiteralToken):
if token.value == '.':
if token.value == ".":
while True:
token = next(token_iterator)
assert isinstance(token, IdentToken)
classes.add(token.value)
token = next(token_iterator)
if not isinstance(token, LiteralToken) or token.value != '.':
if not isinstance(token, LiteralToken) or token.value != ".":
break
else:
combinator_mappings = {
'+': Combinator.ADJACENT_SIBLING,
'>': Combinator.DIRECT_CHILD,
'~': Combinator.GENERAL_SIBLING
"+": Combinator.ADJACENT_SIBLING,
">": Combinator.DIRECT_CHILD,
"~": Combinator.GENERAL_SIBLING,
}
if token.value in combinator_mappings:
combinator = combinator_mappings[token.value]

View File

@ -29,7 +29,7 @@ class Browser:
self._browser_context = browser_context
@asynccontextmanager
async def load_page(self, url: str) -> AsyncGenerator[Page, None]:
async def load_page(self, url: str) -> AsyncGenerator[Optional[Page], None]:
page = await self._browser_context.new_page()
await page.route("*", self._source.route)
for retry in range(0, 3):
@ -53,8 +53,11 @@ class Browser:
class Source(ABC):
"""Base class for sources"""
def __init__(self, error_summary: ErrorSummary) -> None:
def __init__(
self, error_summary: ErrorSummary, block_urls: list[Pattern[str]]
) -> None:
self._error_summary = error_summary
self._block_urls = block_urls
@abstractmethod
async def get_url(self, url: str) -> bytes:
@ -86,15 +89,24 @@ class Source(ABC):
await browser.close()
async def route(self, route: Route) -> None:
content = await self.get_url(route.request.url)
await route.fulfill(body=content, status=200)
url = route.request.url
if any([pattern.match(url) for pattern in self._block_urls]):
await route.fulfill(status=500)
else:
content = await self.get_url(url)
await route.fulfill(body=content, status=200)
class CachedSource(Source):
"""Source loading urls from the internet."""
def __init__(self, error_summary: ErrorSummary, cache: Cache[bytes]) -> None:
super().__init__(error_summary)
def __init__(
self,
error_summary: ErrorSummary,
block_urls: list[Pattern[str]],
cache: Cache[bytes],
) -> None:
super().__init__(error_summary, block_urls)
self._cache = cache
async def get_url(self, url: str) -> bytes:
@ -118,10 +130,11 @@ class OverrideSource(Source):
def __init__(
self,
error_summary: ErrorSummary,
block_urls: list[Pattern[str]],
mappings: list[tuple[str, str]],
next_source: Source,
):
super().__init__(error_summary)
super().__init__(error_summary, block_urls)
self._mappings: list[tuple[Pattern[str], str]] = []
self._next_source = next_source