utils: postpone log print after progress bar
This commit is contained in:
parent
5741d4656c
commit
7f5d3cd283
|
@ -1,35 +1,56 @@
|
|||
"""Common utilities"""
|
||||
from asyncio import gather
|
||||
from typing import Awaitable, cast
|
||||
from contextlib import contextmanager
|
||||
from logging import Handler, LogRecord, getLogger
|
||||
from typing import Awaitable, Iterator, cast
|
||||
|
||||
from click import progressbar
|
||||
|
||||
TaskListType = list[tuple[str, Awaitable[None]]]
|
||||
|
||||
|
||||
@contextmanager
|
||||
def postpone_log() -> Iterator[None]:
|
||||
records: list[LogRecord] = []
|
||||
logger = getLogger()
|
||||
|
||||
class _Handler(Handler):
|
||||
def handle(self, record: LogRecord) -> bool:
|
||||
records.append(record)
|
||||
return False
|
||||
|
||||
handler = _Handler()
|
||||
logger.addHandler(handler)
|
||||
yield
|
||||
logger.removeHandler(handler)
|
||||
for record in records:
|
||||
logger.handle(record)
|
||||
|
||||
|
||||
async def report_progress(
|
||||
label: str, task_list: TaskListType, nb_workers: int = 5
|
||||
) -> None:
|
||||
"""Show a progress bar for a long running task list"""
|
||||
iterator = iter(task_list)
|
||||
|
||||
with progressbar(
|
||||
length=len(task_list),
|
||||
label=label,
|
||||
item_show_func=lambda it: cast(str, it),
|
||||
) as progress:
|
||||
with postpone_log():
|
||||
with progressbar(
|
||||
length=len(task_list),
|
||||
label=label,
|
||||
item_show_func=lambda it: cast(str, it),
|
||||
) as progress:
|
||||
|
||||
async def worker() -> None:
|
||||
try:
|
||||
while True:
|
||||
item_name, task = next(iterator)
|
||||
progress.update(1, current_item=item_name) # type: ignore
|
||||
await task
|
||||
except StopIteration:
|
||||
pass
|
||||
async def worker() -> None:
|
||||
try:
|
||||
while True:
|
||||
item_name, task = next(iterator)
|
||||
progress.update(1, current_item=item_name) # type: ignore
|
||||
await task
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
workers = [worker() for _ in range(0, nb_workers)]
|
||||
await gather(*workers)
|
||||
workers = [worker() for _ in range(0, nb_workers)]
|
||||
await gather(*workers)
|
||||
|
||||
|
||||
def get_url_slug(url: str) -> str:
|
||||
|
|
Loading…
Reference in New Issue