misc-csechet/frontools/cache.py

109 lines
3.3 KiB
Python

"""Cache management"""
from abc import ABC, abstractmethod
from pathlib import Path
from shutil import rmtree
from typing import Awaitable, Callable, Union, Optional
from click import echo
from xdg import xdg_cache_home
CacheFallback = Union[bytes, Callable[[str], Awaitable[Optional[bytes]]]]
class Cache(ABC):
"""Base class for caches"""
@abstractmethod
async def get(self, key: str, fallback: CacheFallback) -> Optional[bytes]:
"""Get an item in the cache, call fallback if it's not present"""
@abstractmethod
def set(self, key: str, data: bytes) -> None:
"""Set a content in the cache"""
@staticmethod
async def _get_fallback_value(key: str, fallback: CacheFallback) -> Optional[bytes]:
if callable(fallback):
result = await fallback(key)
else:
result = fallback
return result
class NullCache(Cache):
"""Disabled cache"""
async def get(self, key: str, fallback: CacheFallback) -> Optional[bytes]:
return await self._get_fallback_value(key, fallback)
def set(self, key: str, data: bytes) -> None:
pass
class FileCache(Cache):
"""Cache on the local filesystem"""
cache_base = xdg_cache_home() / "frontools"
def __init__(self, name: str) -> None:
self._name = name
async def get(
self,
key: str,
fallback: CacheFallback,
) -> Optional[bytes]:
"""Get an item in the cache, call fallback if it's not present"""
cache_file_path = self._get_cache_file_path(key)
if not cache_file_path.is_file():
content = await self._get_fallback_value(key, fallback)
if content is not None:
self.set(key, content)
else:
with open(cache_file_path, "rb") as cache_file:
content = cache_file.read()
return content
def set(self, key: str, data: bytes) -> None:
"""Set content in the cache"""
cache_file_path = self._get_cache_file_path(key)
with open(cache_file_path, "wb") as cache_file:
cache_file.write(data)
@staticmethod
def prune(cache_names: list[str]) -> None:
"""Remove caches from filesystem.
If empty list is provided, all caches will be cleaned
"""
if not cache_names:
cache_names = [
it.name for it in FileCache.cache_base.iterdir() if it.is_dir()
]
for cache_name in cache_names:
cache_path: Path = FileCache.cache_base / cache_name
if not cache_path.is_dir():
echo(f"{cache_path} isn't a chache directory", err=True)
continue
echo(f"Removing {cache_path}")
rmtree(cache_path)
def _get_cache_file_path(self, key: str) -> Path:
key_slug = _get_key_slug(key)
cache_directory = self.cache_base / self._name
file_path = cache_directory.joinpath(*key_slug.split("&"))
file_path = file_path.parent / (file_path.name[:254] + "_")
file_directory = file_path.parent
if not file_directory.is_dir():
file_directory.mkdir(parents=True)
return file_path
def _get_key_slug(url: str) -> str:
"""Return an unique slug usable as a path name for a given url."""
return url.replace("_", "___").replace("/", "__").replace(":", "_")