misc-csechet/frontools/utils.py

82 lines
2.3 KiB
Python
Raw Normal View History

"""Common utilities"""
2022-03-30 01:03:31 +02:00
from asyncio import gather
from datetime import datetime
from os.path import expandvars
from pathlib import Path
from re import compile as re_compile
from typing import Awaitable, cast
from click import echo, progressbar
from xdg import xdg_config_home
class ErrorSummary:
def __init__(self) -> None:
self._errors: list[str] = []
def add_error(self, message: str) -> None:
self._errors.append(message)
def echo(self) -> None:
if not len(self._errors):
return
echo("***** Error summary :")
for error in self._errors:
echo(error, err=True)
2022-03-30 02:39:56 +02:00
def get_default_screenshot_directory() -> Path:
"""Return the default folder where to store screenshot (XDG pictures dir by default)"""
pictures_dir_re = re_compile(r'\w*XDG_PICTURES_DIR\w*\="(?P<path>.*)"\w*$')
pictures_dir = None
with open(
Path(xdg_config_home()) / "user-dirs.dirs", encoding="utf-8"
) as user_dirs:
for line in user_dirs.readlines():
match = pictures_dir_re.match(line)
if match is None:
continue
path_string = match.group("path")
path_string = expandvars(path_string)
pictures_dir = Path(path_string)
break
if pictures_dir is None:
pictures_dir = Path.home()
return pictures_dir / datetime.now().strftime("%Y-%m-%d - %H-%m-%S")
2022-03-30 01:03:31 +02:00
TaskListType = list[tuple[str, Awaitable[None]]]
async def report_progress(
label: str, task_list: TaskListType, nb_workers: int = 5
) -> None:
2022-03-30 01:03:31 +02:00
"""Show a progress bar for a long running task list"""
iterator = iter(task_list)
with progressbar(
length=len(task_list),
label=label,
2022-03-30 02:19:02 +02:00
item_show_func=lambda it: cast(str, it),
2022-03-30 01:03:31 +02:00
) as progress:
2022-03-30 02:19:02 +02:00
async def worker() -> None:
2022-03-30 01:03:31 +02:00
try:
while True:
item_name, task = next(iterator)
2022-03-30 02:19:02 +02:00
progress.update(1, current_item=item_name) # type: ignore
2022-03-30 01:03:31 +02:00
await task
except StopIteration:
pass
workers = [worker() for _ in range(0, nb_workers)]
2022-03-30 01:03:31 +02:00
await gather(*workers)
2022-03-30 02:39:56 +02:00
2022-04-05 16:34:45 +02:00
def get_url_slug(url: str) -> str:
"""Return an unique slug usable as a path name for a given url."""
return url.replace("_", "___").replace("/", "__").replace(":", "_")