"""Store themes and associated urls, providing ways to load them from several sources.""" from asyncio import create_subprocess_shell from asyncio.subprocess import PIPE from json import loads from logging import getLogger from pathlib import Path from re import compile as re_compile from ssl import CERT_NONE, create_default_context from typing import AsyncIterable, Iterable, Optional from aiohttp import ClientSession from bs4 import BeautifulSoup from xdg import xdg_cache_home from yaml import Loader, dump from yaml import load as load_yaml from frontools.utils import TaskListType, report_progress _LOGGER = getLogger(__file__) ThemeIndexData = dict[str, dict[str, list[str]]] UrlEntry = tuple[str, list[str], Optional[str]] # (url, tags, theme name) tuples NodeEntry = tuple[str, list[str]] # (server address, tags) tuples class _Inputs: urls: list[UrlEntry] = [] yaml_files: list[Path] = [] nodes: list[NodeEntry] = [] class ThemeIndex: """Store themes and associated urls, providing ways to load them from several sources.""" def __init__(self) -> None: self._themes: ThemeIndexData = {} self._inputs = _Inputs() @property def urls(self) -> Iterable[tuple[str, str, set[str]]]: """Return themes configured for this context""" for theme_name, theme in self._themes.items(): for url, tags in theme.items(): yield theme_name, url, set(tags) def add_urls(self, *urls: UrlEntry) -> None: """Add an url for a theme""" self._inputs.urls.extend(urls) def add_nodes(self, *nodes: NodeEntry) -> None: """Add an url for a theme""" self._inputs.nodes.extend(nodes) def add_yaml(self, yaml_path: Path) -> None: """Load a yaml file containing dictionnary of urls to add as themes.""" self._inputs.yaml_files.append(yaml_path) async def load(self, update_cache: bool) -> None: index_cache = xdg_cache_home() / "frontools" / "index-cache.yaml" if index_cache.is_file() and not update_cache: _Inputs.yaml_files.append(index_cache) else: await self._load_urls_without_theme() await self._load_urls_from_nodes() with open(index_cache, "w") as index_cache_handle: dump(self._themes, index_cache_handle) self._load_yaml_files() async def _load_urls_without_theme(self) -> None: async def _load(url: str, tags: list[str]) -> None: theme = await _get_theme(url) if theme is None: return self._register(url, tags, theme) tasks: TaskListType = [ (url, _load(url, tags)) for (url, tags, theme) in self._inputs.urls if theme is None ] await report_progress( "Gathering themes from configured urls", tasks, 10, ) async def _load_urls_from_nodes(self) -> None: async def _load(node: str, node_tags: list[str]) -> None: async for url, tags, theme in _get_node_urls(node): assert theme is not None self._register(url, tags + node_tags, theme) await report_progress( "Gathering themes from configured nodes", [(node, _load(node, tags)) for node, tags in self._inputs.nodes], 5, ) async def _load_urls_with_theme(self) -> None: for url, tags, theme in self._inputs.urls: if theme is not None: self._register(url, tags, theme) def _load_yaml_files(self) -> None: for yaml_path in self._inputs.yaml_files: with open(yaml_path, "r", encoding="utf-8") as yaml_file: yaml_document = load_yaml(yaml_file, Loader) for theme_name, urls in yaml_document.items(): for url, tags in urls.items(): self._register(url, tags, theme_name) def _register(self, url: str, new_tags: list[str], theme_name: str) -> None: """Add an url for a theme""" if theme_name is None: self._unknown_themes.append((url, new_tags)) return theme = self._themes.get(theme_name, None) if theme is None: theme = {} self._themes[theme_name] = theme tags = theme.get(url, None) if tags is None: tags = [] theme[url] = tags if new_tags is None: return for tag in new_tags: if tag not in tags: tags.append(tag) THEME_CSS_PATH_PATTERN = re_compile(r".*static/(?P[\w-]*)/style.css.*") async def _get_theme(url: str) -> Optional[str]: try: async with ClientSession() as session: ssl_context = create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = CERT_NONE ssl_context.set_ciphers("DEFAULT@SECLEVEL=1") async with session.get(url, ssl_context=ssl_context) as response: page_content = await response.content.read() page_html = BeautifulSoup(page_content, features="html5lib") links = page_html.find_all("link") for link in links: if "stylesheet" not in link.get("rel", []): continue href = link["href"] theme_match = THEME_CSS_PATH_PATTERN.match(href) if not theme_match: continue return theme_match["theme"] except Exception as ex: _LOGGER.error(f"Error while loading {url} : {ex} skipping") return None _LOGGER.error(f"No theme found for url {url}") return None CAT_HOBO_JSON_SCRIPT = """ for hobo_json in $(find /var/lib/combo/ -name hobo.json); do sudo cat $hobo_json echo "," done """ async def _get_node_urls( node: str, ) -> AsyncIterable[UrlEntry]: process = await create_subprocess_shell( f"ssh {node} -C bash", stdin=PIPE, stdout=PIPE, stderr=PIPE, ) stdout, stderr = await process.communicate(CAT_HOBO_JSON_SCRIPT.encode("utf-8")) if process.returncode != 0: print(stderr.decode("utf-8")) exit(1) stdout_string = stdout.decode("utf-8").strip() config_array_content = f"[ {stdout_string[:-1]} ]" config_array = loads(config_array_content) for tenant_config in config_array: services = tenant_config["services"] variables = tenant_config.get("variables", None) if variables is None: continue theme: Optional[str] = variables.get("theme", None) if theme is None: continue for service in services: service_id = service.get("service-id", None) secondary = service.get("secondary", False) template_name = service.get("template_name", None) if ( service_id not in ["combo", "wcs", "authentic"] or secondary or template_name == "portal-agent" ): continue base_url = service.get("base_url", None) if base_url: if service_id == "authentic": base_url = base_url + "login/" yield (base_url, [service_id], theme)