chrono/tests/utils.py

614 lines
21 KiB
Python

# chrono - agendas system
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import re
import sys
import textwrap
import typing
import zoneinfo
from django.core.files.base import ContentFile
from django.utils.text import slugify
from chrono.agendas.models import (
Agenda,
AgendaReminderSettings,
Booking,
Desk,
Event,
MeetingType,
Resource,
TimePeriod,
TimePeriodException,
TimePeriodExceptionSource,
UnavailabilityCalendar,
)
from chrono.utils.interval import Interval
from chrono.utils.timezone import localtime
def login(app, username='admin', password='admin'):
login_page = app.get('/login/')
login_form = login_page.forms[0]
login_form['username'] = username
login_form['password'] = password
resp = login_form.submit()
assert resp.status_int == 302
return app
def build_timeperiods(desk, *timeperiods):
'''Build timeperiods for a desk. timperiods is a list of string with
format:
monday-friday;9:00-12:00,14:00-17:00
saturday;9:00-12:00
'''
for defn in timeperiods:
if isinstance(defn, str):
defn = defn.strip()
parts = defn.split()
else:
parts = defn
days = []
openings = []
first_opening = ([i for i, part in enumerate(parts) if part[:1].isdigit()] or [len(parts)])[0]
days = parts[:first_opening]
openings = parts[first_opening:]
assert days and openings, defn
for day in days:
day_to_weekday = {
'monday': 0,
'tuesday': 1,
'wednesday': 2,
'thursday': 3,
'friday': 4,
'saturday': 5,
'sunday': 6,
}
if '-' in day:
begin, end = day.split('-')
begin_weekday = day_to_weekday[begin]
end_weekday = day_to_weekday[end]
assert begin_weekday < end_weekday, f'{begin} must be before {end}: {day}'
weekdays = range(begin_weekday, end_weekday + 1)
else:
weekdays = [datetime.datetime.strptime(d, '%A').weekday() for d in day.split(',')]
for weekday in weekdays:
for opening in openings:
start, end = opening.split('-')
start_time = datetime.datetime.strptime(start, '%H:%M').time()
end_time = datetime.datetime.strptime(end, '%H:%M').time()
TimePeriod.objects.create(
desk=desk, weekday=weekday, start_time=start_time, end_time=end_time
)
def build_agenda(kind, label='Agenda', slug=None, **kwargs):
agenda_kwargs = {
'label': label,
'slug': slug or slugify(label),
'kind': kind,
}
agenda_kwargs.update(kwargs)
return Agenda.objects.create(**agenda_kwargs)
def build_event_agenda(label='Agenda', slug=None, events=None, **kwargs):
agenda = build_agenda('events', label=label, slug=slug, **(kwargs or {}))
for label, event_defn in (events or {}).items():
event = Event.objects.create(agenda=agenda, label=label, **event_defn)
setattr(agenda, f'_{event.slug.replace("-", "_")}', event)
return agenda
def build_meetings_agenda(
label='Agenda',
slug=None,
meeting_types: typing.Union[int, tuple[str, int]] = None,
desks: typing.Union[str, dict] = None,
resources=None,
reminder_settings: list = None,
**kwargs,
):
agenda = build_agenda('meetings', label=label, slug=slug, **(kwargs or {}))
for meeting_type in meeting_types or []:
if isinstance(meeting_type, (int, str)):
meeting_type = {'slug': meeting_type}
add_meeting_type(agenda, **meeting_type)
if isinstance(desks, tuple):
desks = [desks]
if desks and isinstance(desks, list) and isinstance(desks[0], tuple):
desks = dict(desks)
if isinstance(desks, dict):
desks = [{'label': key, 'timeperiods': value} for key, value in desks.items()]
for desk_kwargs in desks or []:
add_desk(agenda, **desk_kwargs)
for reminder_setting in reminder_settings or []:
AgendaReminderSettings.objects.create(agenda=agenda, **reminder_setting)
for label in resources or []:
slug = slugify(label)
resource, _ = Resource.objects.get_or_create(slug=slugify(label), defaults={'label': label})
agenda.resources.add(resource)
setattr(agenda, f'_re_{slug.replace("-", "_")}', resource)
return agenda
def build_virtual_agenda(
label: str = 'Agenda',
slug: str = None,
agendas: list[Agenda] = None,
meeting_types: list = None,
**kwargs,
):
agenda = build_agenda('virtual', label=label, slug=slug, **(kwargs or {}))
if isinstance(agendas, dict):
agendas = [{'label': label, **defn} for label, defn in agendas.items()]
for agenda_defn in agendas or []:
if isinstance(agenda_defn, Agenda):
real_agenda = agenda_defn
else:
agenda_defn['meeting_types'] = agenda_defn.get('meeting_types') or meeting_types
real_agenda = build_meetings_agenda(**agenda_defn)
agenda.real_agendas.add(real_agenda)
setattr(agenda, f'_{real_agenda.slug.replace("-", "_")}', real_agenda)
for virtual_meeting_type in agenda.iter_meetingtypes():
virtual_meeting_type.agenda = agenda
setattr(agenda, f'_{virtual_meeting_type.slug.replace("-", "_")}', virtual_meeting_type)
return agenda
def build_unavailability_calendar(label: str, slug: str = None, exceptions=None, exception_sources=None):
slug = slug or slugify(label)
unavailability_calendar, dummy = UnavailabilityCalendar.objects.update_or_create(
slug=slug, defaults={'label': label}
)
for exception in exceptions or []:
add_exception(unavailability_calendar, **exception)
for exception in exception_sources or []:
add_exception_source(unavailability_calendar, **exception)
return unavailability_calendar
def add_unavailability_calendar(desk: Desk, **kwargs):
unavailability_calendar = build_unavailability_calendar(**kwargs)
unavailability_calendar.desks.add(desk)
setattr(desk, f'_{unavailability_calendar.slug.replace("-", "_")}', unavailability_calendar)
return unavailability_calendar
def add_desk(
agenda: Agenda,
label: str,
timeperiods=None,
exceptions=None,
exception_sources=None,
unavailability_calendars=None,
):
desk = Desk.objects.create(label=label, agenda=agenda, slug=slugify(label))
setattr(agenda, f'_{desk.slug.replace("-", "_")}', desk)
if timeperiods:
if not isinstance(timeperiods, list):
timeperiods = [timeperiods]
try:
build_timeperiods(desk, *timeperiods)
except Exception as e:
raise ValueError(f'invalid timeperiods "{e}": {timeperiods}')
for exception in exceptions or []:
add_exception(desk, **exception)
for exception_source in exception_sources or []:
add_exception_source(desk, **exception_source)
for unavailability_calendar in unavailability_calendars or []:
add_unavailability_calendar(desk, **unavailability_calendar)
return desk
def add_meeting_type(agenda: Agenda, slug: str, label: str = None, duration: int = None):
if duration is None:
if isinstance(slug, int) or ':' not in slug:
duration = int(slug)
slug = f'mt-{duration}'
else:
slug, duration = slug.split(':')
duration = int(duration)
kwargs = {'agenda': agenda, 'slug': slug, 'duration': duration}
if label:
kwargs['label'] = label
meeting_type = MeetingType.objects.create(**kwargs)
setattr(agenda, f'_{meeting_type.slug.replace("-", "_")}', meeting_type)
return meeting_type
def add_day_timeperiod(
target: typing.Union[Desk, Agenda],
start: typing.Union[datetime.datetime, Interval],
end: typing.Union[datetime.datetime, int] = None,
):
if isinstance(target, Desk):
desk = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if isinstance(start, Interval):
start, end = start
elif isinstance(end, int):
end = start + datetime.timdelta(minutes=end)
assert localtime(start).date() == localtime(end).date()
desk.timeperiod_set.create(
date=localtime(start).date(),
start_time=localtime(start).time().replace(tzinfo=None),
end_time=localtime(end).time().replace(tzinfo=None),
)
def add_exception(
target: typing.Union[Desk, Agenda, UnavailabilityCalendar],
start_datetime: typing.Union[datetime.datetime, Interval],
end_datetime: typing.Union[datetime.datetime, int] = None,
label: str = None,
):
desk = None
unavailability_calendar = None
if isinstance(target, Desk):
desk = target
elif isinstance(target, UnavailabilityCalendar):
unavailability_calendar = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if isinstance(start_datetime, Interval):
start_datetime, end_datetime = start_datetime
elif isinstance(end_datetime, int):
end_datetime = start_datetime + datetime.timdelta(minutes=end_datetime)
timeperiodexception, _ = TimePeriodException.objects.get_or_create(
desk=desk,
unavailability_calendar=unavailability_calendar,
label=label,
start_datetime=start_datetime,
end_datetime=end_datetime,
unavailability_calendar__isnull=True,
group__isnull=True,
)
if label:
setattr(target, f'_{slugify(label)}', timeperiodexception)
return timeperiodexception
def add_exception_source(target: typing.Union[Desk, Agenda], ics_filename: str, ics_file):
desk = None
unavailability_calendar = None
if isinstance(target, Desk):
desk = target
elif isinstance(target, UnavailabilityCalendar):
unavailability_calendar = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if isinstance(ics_file, str):
ics_file = ics_file.encode()
if isinstance(ics_file, bytes):
ics_file = ContentFile(ics_file, name=ics_filename)
timeperiodexceptionsource = TimePeriodExceptionSource.objects.create(
desk=desk,
unavailability_calendar=unavailability_calendar,
ics_filename=ics_filename,
ics_file=ics_file,
)
setattr(target, f'_{slugify(ics_filename.replace(".", "_"))}', timeperiodexceptionsource)
return timeperiodexceptionsource
def add_meeting(
target: typing.Union[Desk, Agenda],
start_datetime: datetime.datetime,
meeting_type: typing.Union[str, MeetingType] = None,
**kwargs,
):
if isinstance(target, Desk):
desk = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if meeting_type is None:
meeting_type = desk.agenda.meetingtype_set.get()
elif isinstance(meeting_type, str):
meeting_type = desk.agenda.meetingtype_set.get(slug=meeting_type)
event = Event.objects.create(
agenda=desk.agenda,
desk=desk,
meeting_type=meeting_type,
start_datetime=start_datetime,
full=False,
places=1,
)
booking = Booking.objects.create(event=event, **kwargs)
event.booking = booking
return event
PARIS_TZ = zoneinfo.ZoneInfo('Europe/Paris')
UTC = zoneinfo.ZoneInfo('UTC')
def datetime_in_tz(s, tz):
duration = None
if ':' not in s.rsplit(' ')[-1]:
s, duration = s.rsplit(' ', 1)
if duration.endswith('h'):
duration = datetime.timedelta(hours=int(duration[:-1]))
elif duration.endswith('m'):
duration = datetime.timedelta(minutes=int(duration[:-1]))
else:
raise ValueError(f'bad datetime string: {s}')
dt = datetime.datetime.fromisoformat(s)
assert not dt.tzinfo, 's must not contain timezone offset'
dt = dt.replace(tzinfo=tz)
if duration:
return Interval(dt, dt + duration)
return dt
def build_agendas(description):
'''Mini-language to build agendas'''
regexp = re.compile(r'(?:\'[^\']+\'|"[^"]+"|[^ \'"]+)+')
call_frame = sys._getframe(1)
description = textwrap.dedent(description)
class Node(typing.NamedTuple):
tokens: list[str]
nodes: list
line_number: int
@classmethod
def from_tokens(cls, tokens, line_number):
return cls(tokens=tokens, nodes=[], line_number=line_number)
class Stack(typing.NamedTuple):
level: int
nodes: list
nodes = []
stack = [Stack(0, nodes)]
lines = list(enumerate(description.splitlines(), start=1))
def eval_value(value):
return eval(value, call_frame.f_globals, call_frame.f_locals) # pylint: disable=eval-used
def eval_label(value):
if value[0] in '"\'':
return value[1:-1]
return value
def syntax_error(msg, line_number=None, node=None):
line_number = line_number or node.line_number
parts = [f'{msg} on line {line_number}']
parts += [
f'{"> " if i == line_number else " "}{i}: {line}'
for i, line in lines[max(line_number - 4, 0) : min(line_number + 3, len(lines))]
]
return SyntaxError('\n'.join(parts))
for line_number, line in lines:
indentation = len(line) - len(line.lstrip())
tokens = []
for token in regexp.findall(line.strip()):
# comment
if token.startswith('#'):
break
if token:
tokens.append(token)
if not tokens:
continue
level = stack[-1].level
if indentation == stack[-1].level:
pass
elif indentation > level: # indent
stack.append(Stack(level=indentation, nodes=stack[-1].nodes[-1].nodes))
else: # deindent
while indentation < level:
stack.pop()
level = stack[-1].level
if level != indentation:
raise syntax_error('indentation mismatch', line_number=line_number)
stack[-1].nodes.append(Node.from_tokens(tokens, line_number))
def parse_unavailability_calendar(node):
dummy, *rest = node.tokens
try:
label, slug, *rest = rest
kwargs = {'label': label, 'slug': slug}
except ValueError:
(label,) = rest
kwargs = {'label': label}
exceptions = []
exception_sources = []
for node in node.nodes:
kind, *dummy = node.tokens
if kind == 'exception':
exceptions.append(parse_exception(node))
elif kind == 'exception-source':
exception_sources.append(parse_exception_source(node))
return {**kwargs, 'exceptions': exceptions, 'exception_sources': exception_sources}
def parse_node(node):
kind, *dummy = node.tokens
try:
if kind == 'meetings':
return build_meetings_agenda(**parse_meetings(node))
if kind == 'virtual':
return build_virtual_agenda(**parse_virtual(node))
if kind == 'unavailability-calendar':
return build_unavailability_calendar(**parse_unavailability_calendar(node))
raise ValueError('unknown node type')
except Exception as e:
raise syntax_error(e, node=node)
def parse_kwargs(tokens):
kwargs = {}
for token in tokens:
key, value = token.split('=', 1)
kwargs[key] = eval_value(value)
return kwargs
def parse_agenda_kwargs(tokens):
meeting_types = []
kwargs = {'meeting_types': meeting_types}
for token in tokens:
if not meeting_types:
if '=' in token:
key, value = token.split('=', 1)
kwargs[key] = eval_value(value)
continue
try:
meeting_types.append(int(token))
except Exception:
raise ValueError(f'invalid meeting type duration "{token}"')
return kwargs
def parse_exception(node):
dummy, *rest = node.tokens
kwargs = {}
try:
(label,) = rest
kwargs['label'] = eval_label(rest[0])
except ValueError:
try:
label, start_datetime = rest
start_datetime = eval_value(start_datetime)
return {'label': label, 'start_datetime': start_datetime}
except ValueError:
label, start_datetime, end_datetime, *rest = rest
start_datetime = eval_value(start_datetime)
end_datetime = eval_value(end_datetime)
return {'label': label, 'start_datetime': start_datetime, 'end_datetime': end_datetime}
else:
for node in node.nodes:
key, *rest = node.tokens
kwargs[key] = eval_value(' '.join(rest))
return kwargs
def parse_exception_source(node):
dummy, filename, *rest = node.tokens
return {'ics_filename': filename, 'ics_file': eval_value(' '.join(rest))}
def parse_desk(node):
kind, label, *inline_timeperiod = node.tokens
if kind != 'desk':
raise syntax_error('invalid desk description', node=node)
timeperiods = []
exceptions = []
exception_sources = []
unavailability_calendars = []
for node in node.nodes:
kind, *dummy = node.tokens
if kind == 'timeperiod':
timeperiods.append(parse_timeperiod(node))
elif kind == 'exception':
exceptions.append(parse_exception(node))
elif kind == 'exception-source':
exception_sources.append(parse_exception_source(node))
elif kind == 'unavailability-calendar':
unavailability_calendars.append(parse_unavailability_calendar(node))
else:
raise ValueError('unknown desk child node')
if inline_timeperiod:
timeperiods.insert(0, inline_timeperiod)
return {
'label': eval_label(label),
'timeperiods': timeperiods,
'exceptions': exceptions,
'exception_sources': exception_sources,
'unavailability_calendars': unavailability_calendars,
}
def parse_timeperiod(node):
kind, *defn = node.tokens
if kind != 'timeperiod':
raise syntax_error('invalid timeperiod description', node=node)
return defn
def parse_mt(node):
dummy, *rest = node.tokens
if len(rest) == 1:
return rest[0]
elif len(rest) == 2:
return {'slug': rest[0], 'duration': int(rest[1])}
else:
return {'label': eval_label(rest[0]), 'slug': rest[1], 'duration': int(rest[2])}
def parse_reminder_setting(node):
dummy, *rest = node.tokens
return parse_kwargs(rest)
def parse_meetings(node):
dummy, label, *rest = node.tokens
kwargs = parse_agenda_kwargs(rest)
desks = []
meeting_types = kwargs.setdefault('meeting_types', [])
for node in node.nodes:
kind, *rest = node.tokens
if kind == 'desk':
desks.append(parse_desk(node))
elif kind == 'meeting-type':
meeting_types.append(parse_mt(node))
elif kind == 'reminder-setting':
kwargs.setdefault('reminder_settings', []).append(parse_reminder_setting(node))
else:
raise ValueError('unknown meeting children')
return {**kwargs, 'label': eval_label(label), 'desks': desks}
def parse_virtual(node):
dummy, label, *rest = node.tokens
kwargs = parse_agenda_kwargs(rest)
return {**kwargs, 'label': eval_label(label), 'agendas': list(map(parse_meetings, node.nodes))}
objects = [parse_node(node) for node in nodes]
if len(objects) > 1:
return objects
else:
return objects[0]
def paris(s):
return datetime_in_tz(s, tz=PARIS_TZ)
def utc(s):
return datetime_in_tz(s, tz=UTC)