167 lines
5.3 KiB
Python
167 lines
5.3 KiB
Python
"""Source for remote files"""
|
|
from abc import ABC, abstractmethod
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
from re import Pattern
|
|
from re import compile as re_compile
|
|
from typing import AsyncGenerator, AsyncIterable, Optional, cast
|
|
|
|
from aiohttp import ClientConnectionError, ClientSession
|
|
from bs4 import BeautifulSoup
|
|
from playwright.async_api import (
|
|
BrowserContext,
|
|
Error,
|
|
Page,
|
|
Route,
|
|
TimeoutError,
|
|
ViewportSize,
|
|
async_playwright,
|
|
)
|
|
|
|
from frontools.cache import Cache
|
|
from frontools.utils import ErrorSummary
|
|
|
|
|
|
class Browser:
|
|
def __init__(self, source: "Source", browser_context: BrowserContext) -> None:
|
|
"""Wraps a browser instance, with helpers methods to load pages."""
|
|
self._source = source
|
|
self._browser_context = browser_context
|
|
|
|
@asynccontextmanager
|
|
async def load_page(self, url: str) -> AsyncGenerator[Optional[Page], None]:
|
|
page = await self._browser_context.new_page()
|
|
await page.route("*", self._source.route)
|
|
for retry in range(0, 3):
|
|
try:
|
|
await page.goto(url)
|
|
await page.wait_for_load_state("networkidle")
|
|
break
|
|
except TimeoutError:
|
|
if retry == 3:
|
|
self._source._error_summary.add_error(
|
|
f"Timeout while loading {url} : retried 3 times, skipping"
|
|
)
|
|
except Error as ex:
|
|
self._source._error_summary.add_error(
|
|
f"Error while loading {url} : {ex}"
|
|
)
|
|
yield page
|
|
await page.close()
|
|
|
|
|
|
class Source(ABC):
|
|
"""Base class for sources"""
|
|
|
|
def __init__(
|
|
self, error_summary: ErrorSummary, block_urls: list[Pattern[str]]
|
|
) -> None:
|
|
self._error_summary = error_summary
|
|
self._block_urls = block_urls
|
|
|
|
@abstractmethod
|
|
async def get_url(self, url: str) -> bytes:
|
|
"""Retrieve the given url content"""
|
|
|
|
@asynccontextmanager
|
|
async def get_browser(
|
|
self, width: Optional[int] = None, height: Optional[int] = None
|
|
) -> AsyncGenerator[Browser, None]:
|
|
"""Return a Playwright browser that will eventually get files from local cache"""
|
|
|
|
viewport: ViewportSize = cast(
|
|
ViewportSize, None
|
|
) # Playwright typings are broken
|
|
|
|
if width is not None:
|
|
viewport = dict(
|
|
# height is not used, as screenshot are taken full page
|
|
width=width,
|
|
height=600,
|
|
)
|
|
|
|
async with async_playwright() as pwright:
|
|
browser = await pwright.firefox.launch(headless=True)
|
|
context = await browser.new_context(
|
|
viewport=viewport, ignore_https_errors=True
|
|
)
|
|
yield Browser(self, context)
|
|
await browser.close()
|
|
|
|
async def route(self, route: Route) -> None:
|
|
url = route.request.url
|
|
if any([pattern.match(url) for pattern in self._block_urls]):
|
|
await route.fulfill(status=500)
|
|
else:
|
|
content = await self.get_url(url)
|
|
await route.fulfill(body=content, status=200)
|
|
|
|
|
|
class CachedSource(Source):
|
|
"""Source loading urls from the internet."""
|
|
|
|
def __init__(
|
|
self,
|
|
error_summary: ErrorSummary,
|
|
block_urls: list[Pattern[str]],
|
|
cache: Cache,
|
|
) -> None:
|
|
super().__init__(error_summary, block_urls)
|
|
self._cache = cache
|
|
|
|
async def get_url(self, url: str) -> bytes:
|
|
"""Get a page content from the local or remote cache."""
|
|
return await self._cache.get(url, self._load_url)
|
|
|
|
async def _load_url(self, url: str) -> bytes:
|
|
try:
|
|
async with ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
return await response.content.read()
|
|
except ClientConnectionError as ex:
|
|
self._error_summary.add_error(f"error while loading {url} : {ex}")
|
|
|
|
return b""
|
|
|
|
|
|
class OverrideSource(Source):
|
|
"""Source overriding paths matching patterns with local files"""
|
|
|
|
def __init__(
|
|
self,
|
|
error_summary: ErrorSummary,
|
|
block_urls: list[Pattern[str]],
|
|
mappings: list[tuple[str, str]],
|
|
next_source: Source,
|
|
):
|
|
super().__init__(error_summary, block_urls)
|
|
self._mappings: list[tuple[Pattern[str], str]] = []
|
|
self._next_source = next_source
|
|
|
|
for pattern, replace in mappings:
|
|
self._mappings.append((re_compile(pattern), replace))
|
|
|
|
async def get_url(self, url: str) -> bytes:
|
|
"""Return local stylesheet"""
|
|
|
|
for pattern, replace in self._mappings:
|
|
if pattern.match(url):
|
|
mapped_path = Path(pattern.sub(replace, url))
|
|
if mapped_path.is_file():
|
|
with open(mapped_path, "rb") as mapped_file:
|
|
return mapped_file.read()
|
|
|
|
return await self._next_source.get_url(url)
|
|
|
|
|
|
async def get_page_stylesheets(source: Source, url: str) -> AsyncIterable[str]:
|
|
"""Return styleheets urls for a given page."""
|
|
page_content = await source.get_url(url)
|
|
page_html = BeautifulSoup(page_content, features="html5lib")
|
|
links = page_html.find_all("link")
|
|
for link in links:
|
|
if "stylesheet" not in link.get("rel", []):
|
|
continue
|
|
|
|
yield link["href"]
|