diff --git a/tests/test_agendas.py b/tests/test_agendas.py index 7fdaaa9b..dfb69f29 100644 --- a/tests/test_agendas.py +++ b/tests/test_agendas.py @@ -38,6 +38,8 @@ from chrono.agendas.models import ( ) from chrono.utils.timezone import localtime, make_aware, make_naive, now +from .utils import add_meeting, build_agendas, utc + pytestmark = pytest.mark.django_db ICS_SAMPLE = """BEGIN:VCALENDAR @@ -209,7 +211,7 @@ def test_agenda_minimal_booking_delay(freezer): def test_agenda_minimal_booking_delay_in_working_days(settings, freezer): freezer.move_to('2021-07-09') - agenda = Agenda.objects.create(label='Agenda', minimal_booking_delay=1) + agenda = build_agendas('meetings Agenda minimal_booking_delay=1') settings.WORKING_DAY_CALENDAR = None agenda.minimal_booking_delay_in_working_days = True @@ -298,7 +300,7 @@ def delay_parameter_to_label(argvalue): ) def test_agenda_minimal_booking_delay_no_minimal_booking_time(freezer, current_time, min_booking_datetime): freezer.move_to(current_time) - agenda = Agenda.objects.create(label='Agenda', minimal_booking_delay=4, minimal_booking_time=None) + agenda = build_agendas('meetings Agenda minimal_booking_delay=4 minimal_booking_time=None') assert make_naive(agenda.min_booking_datetime) == min_booking_datetime @@ -694,13 +696,14 @@ def test_meeting_type_slugs(): def test_timeperiodexception_creation_from_ics(): - agenda = Agenda.objects.create(label='Test 1 agenda') - desk = Desk.objects.create(label='Test 1 desk', agenda=agenda) - source = desk.timeperiodexceptionsource_set.create( - ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE, name='sample.ics') + agenda = build_agendas( + '''\ +meetings "Test 1 agenda" + desk "Test 1 desk" + exception-source sample.ics ICS_SAMPLE''' ) - source.refresh_timeperiod_exceptions_from_ics() - assert TimePeriodException.objects.filter(desk=desk).count() == 2 + agenda._test_1_desk._sample_ics.refresh_timeperiod_exceptions_from_ics() + assert TimePeriodException.objects.filter(desk=agenda._test_1_desk).count() == 2 def test_timeperiodexception_creation_from_ics_without_startdt(): @@ -1525,8 +1528,13 @@ def test_desk_duplicate_exception_sources(): } ) def test_desk_duplicate_exception_source_from_settings(): - agenda = Agenda.objects.create(label='Agenda') - desk = Desk.objects.create(label='Desk', agenda=agenda) + agenda = build_agendas( + '''\ + meetings Agenda + desk Desk + ''' + ) + desk = agenda._desk desk.import_timeperiod_exceptions_from_settings(enable=True) source = desk.timeperiodexceptionsource_set.get(settings_slug='holidays') @@ -2254,30 +2262,25 @@ def test_agenda_reminders_templated_content(mailoutbox, freezer): @override_settings(TIME_ZONE='UTC') def test_agenda_reminders_meetings(mailoutbox, freezer): - freezer.move_to('2020-01-01 11:00') - agenda = Agenda.objects.create(label='Events', kind='meetings') - desk = Desk.objects.create(agenda=agenda, label='Desk') - meetingtype = MeetingType.objects.create(agenda=agenda, label='Bar', duration=30) - TimePeriod.objects.create( - desk=desk, weekday=now().weekday(), start_time=datetime.time(10, 0), end_time=datetime.time(18, 0) + freezer.move_to(utc('2020-01-01 11:00')) + agenda = build_agendas( + '''\ + meetings Events + desk Desk + timeperiod wednesday 10:00-18:00 + meeting-type Bar 30 + reminder-setting days_before_email=2 + ''' ) - AgendaReminderSettings.objects.create(agenda=agenda, days_before_email=2) - - event = Event.objects.create( - agenda=agenda, - places=1, - desk=desk, - meeting_type=meetingtype, - start_datetime=now() + datetime.timedelta(days=5), # 06/01 - ) - Booking.objects.create( - event=event, + add_meeting( + agenda, + start_datetime=utc('2020-01-06T11:00:00'), user_email='t@test.org', user_display_label='Birth certificate', form_url='publik://default/someform/1/', ) - freezer.move_to('2020-01-04 15:00') + freezer.move_to(utc('2020-01-04 15:00')) call_command('send_booking_reminders') assert len(mailoutbox) == 1 diff --git a/tests/test_utils.py b/tests/test_utils.py index 77f95d29..cc851226 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,13 +2,16 @@ import datetime import json from unittest import mock +import pytest from requests.exceptions import ConnectionError from requests.models import Response -from chrono.agendas.models import Agenda +from chrono.agendas.models import Agenda, TimePeriod from chrono.utils.date import get_weekday_index from chrono.utils.lingo import CheckType, get_agenda_check_types +from .utils import build_agendas, build_meetings_agenda, build_virtual_agenda, paris, utc + def test_get_weekday_index(): for date in ( @@ -90,3 +93,176 @@ def test_get_agenda_check_types(): CheckType(slug='bar-reason', label='Bar reason', kind='presence'), CheckType(slug='foo-reason', label='Foo reason', kind='absence'), ] + + +def test_build_meetings_agenda(db): + agenda = build_meetings_agenda( + meeting_types=[30], resources=['Re1'], desks=('Desk 1', 'monday-friday 09:00-12:00 14:00-17:00') + ) + assert agenda.slug == 'agenda' + assert agenda.label == 'Agenda' + assert agenda._mt_30 + assert list(agenda.meetingtype_set.all()) == [agenda._mt_30] + assert agenda.desk_set.count() == 1 + assert agenda.desk_set.all()[0].slug == 'desk-1' + timeperiods = agenda.desk_set.all()[0].timeperiod_set.all() + assert timeperiods.count() == 10 + assert set(timeperiods.values_list('weekday', flat=True)) == set(range(0, 5)) + assert set(timeperiods.values_list('start_time', 'end_time')) == { + (datetime.time(9), datetime.time(12)), + (datetime.time(14), datetime.time(17)), + } + assert agenda.resources.count() == 1 + assert agenda.resources.get().label == 'Re1' + assert agenda.resources.get().slug == 're1' + + +def test_build_meetings_agenda_multiple_desks(db): + agenda = build_meetings_agenda( + meeting_types=[30], + desks={ + 'desk-1': ['monday-friday 09:00-12:00'], + 'desk-2': ['monday-friday 14:00-17:00'], + }, + ) + desks = agenda.desk_set.all() + assert set(desks.values_list('slug', flat=True)) == {'desk-1', 'desk-2'} + assert agenda.desk_set.all()[0].slug == 'desk-1' + timeperiods = TimePeriod.objects.filter(desk__in=desks) + assert timeperiods.count() == 10 + assert set(timeperiods.values_list('weekday', flat=True)) == set(range(0, 5)) + assert set(timeperiods.values_list('start_time', 'end_time')) == { + (datetime.time(9), datetime.time(12)), + (datetime.time(14), datetime.time(17)), + } + + +def test_build_virtual_agenda(db): + agenda = build_virtual_agenda( + agendas={ + 'Agenda 1': { + 'desks': ('Bureau 1', 'monday-friday 08:00-12:00 14:00-17:00'), + }, + 'Agenda 2': { + 'desks': ('Bureau 1', 'monday,tuesday 09:00-12:00'), + }, + 'Agenda 3': { + 'desks': ('Bureau 1', 'monday-friday 15:00-17:00'), + }, + }, + meeting_types=[30], + ) + + assert agenda._agenda_1 + assert agenda._agenda_1._mt_30 + assert agenda._agenda_2 + assert agenda._agenda_3 + assert agenda._mt_30 + + assert agenda.real_agendas.count() == 3 + timeperiods = TimePeriod.objects.filter(desk__agenda__in=agenda.real_agendas.all()) + assert timeperiods.count() == 17 + assert set(timeperiods.values_list('weekday', flat=True)) == set(range(0, 5)) + assert set(timeperiods.values_list('start_time', 'end_time')) == { + (datetime.time(8), datetime.time(12)), + (datetime.time(9), datetime.time(12)), + (datetime.time(14), datetime.time(17)), + (datetime.time(15), datetime.time(17)), + } + assert agenda._agenda_1 + + +def test_build_agendas(db): + # pylint: disable=unused-variable + ICS_SAMPLE = """BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//foo.bar//EN +BEGIN:VEVENT +DTSTAMP:20170824T082855Z +DTSTART:20170831T170800Z +DTEND:20170831T203400Z +SEQUENCE:1 +SUMMARY:Événement 1 +END:VEVENT +BEGIN:VEVENT +DTSTAMP:20170824T092855Z +DTSTART:20170830T180800Z +DTEND:20170831T223400Z +SEQUENCE:2 +END:VEVENT +END:VCALENDAR""" + + unavailability_calendar, agenda_1, agenda_2, virtual = build_agendas( + ''' +unavailability-calendar Congés + exception Noël + start_datetime paris('2023-12-25T00:00:00 24h') + exception-source sample.ics ICS_SAMPLE +meetings "Agenda 1" maximal_booking_delay=15 30 45 # comment 1 + # comment 2 + desk "Desk 1" + timeperiod monday-friday 08:00-12:00 + exception Grève + start_datetime paris('2023-04-01T01:01:01') + end_datetime paris('2023-04-01T01:01:01') + exception-source sample.ics ICS_SAMPLE + unavailability-calendar Congés + desk "Bureau 2" + timeperiod monday-friday 14:00-17:00 + +meetings 'Agenda 2' 30 + desk "Desk 1" monday-friday 08:00-12:00 + desk "Desk 2" monday,friday 14:00-17:00 + +virtual "Agenda 3" 30 + meetings CNI + desk bureau1 monday-friday 10:00-12:00 + meetings Passeport + desk bureau2 monday-friday 14:00-17:00 +''' + ) + assert unavailability_calendar.label == 'Congés' + assert unavailability_calendar._noel + assert unavailability_calendar._sample_ics + assert agenda_1.label == 'Agenda 1' + assert agenda_1.maximal_booking_delay == 15 + assert agenda_1._desk_1._greve + assert agenda_1._desk_1._sample_ics + assert agenda_1._desk_1 + assert agenda_1._desk_1._conges == unavailability_calendar + assert agenda_1._bureau_2 + assert agenda_1._mt_30 + assert agenda_1._mt_45 + assert agenda_2.label == 'Agenda 2' + assert agenda_2._desk_1 + assert agenda_2._desk_2 + assert agenda_2._mt_30 + assert virtual._cni._bureau1 + assert virtual._cni._mt_30 + assert virtual._mt_30 + assert virtual._passeport._bureau2 + assert virtual._passeport._mt_30 + + +def test_build_agendas_indentation_mismatch(db): + with pytest.raises(SyntaxError, match=r'on line 5'): + build_agendas( + ''' +agenda xxx + zobi dd + kkk + iii # here, bad indentation + +agenda "Agenda 1" 30 + desk 1 + desk 2 +''' + ) + + +def test_paris(): + assert paris('2023-04-19T11:00:00').isoformat() == '2023-04-19T11:00:00+02:00' + + +def test_utc(): + assert utc('2023-04-19T11:00:00').isoformat() == '2023-04-19T11:00:00+00:00' diff --git a/tests/utils.py b/tests/utils.py index 4a14f1b7..25ffc694 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,3 +1,46 @@ +# chrono - agendas system +# Copyright (C) 2023 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import datetime +import re +import sys +import textwrap +import typing +import zoneinfo + +from django.core.files.base import ContentFile +from django.utils.text import slugify + +from chrono.agendas.models import ( + Agenda, + AgendaReminderSettings, + Booking, + Desk, + Event, + MeetingType, + Resource, + TimePeriod, + TimePeriodException, + TimePeriodExceptionSource, + UnavailabilityCalendar, +) +from chrono.utils.interval import Interval +from chrono.utils.timezone import localtime + + def login(app, username='admin', password='admin'): login_page = app.get('/login/') login_form = login_page.forms[0] @@ -6,3 +49,549 @@ def login(app, username='admin', password='admin'): resp = login_form.submit() assert resp.status_int == 302 return app + + +def build_timeperiods(desk, *timeperiods): + '''Build timeperiods for a desk. timperiods is a list of string with + format: + + monday-friday;9:00-12:00,14:00-17:00 + saturday;9:00-12:00 + + ''' + for defn in timeperiods: + if isinstance(defn, str): + defn = defn.strip() + parts = defn.split() + else: + parts = defn + + days = [] + openings = [] + + first_opening = ([i for i, part in enumerate(parts) if part[:1].isdigit()] or [len(parts)])[0] + days = parts[:first_opening] + openings = parts[first_opening:] + assert days and openings, defn + for day in days: + day_to_weekday = { + 'monday': 0, + 'tuesday': 1, + 'wednesday': 2, + 'thursday': 3, + 'friday': 4, + 'saturday': 5, + 'sunday': 6, + } + if '-' in day: + begin, end = day.split('-') + begin_weekday = day_to_weekday[begin] + end_weekday = day_to_weekday[end] + assert begin_weekday < end_weekday, f'{begin} must be before {end}: {day}' + weekdays = range(begin_weekday, end_weekday + 1) + else: + weekdays = [datetime.datetime.strptime(d, '%A').weekday() for d in day.split(',')] + for weekday in weekdays: + for opening in openings: + start, end = opening.split('-') + start_time = datetime.datetime.strptime(start, '%H:%M').time() + end_time = datetime.datetime.strptime(end, '%H:%M').time() + TimePeriod.objects.create( + desk=desk, weekday=weekday, start_time=start_time, end_time=end_time + ) + + +def build_agenda(kind, label='Agenda', slug=None, **kwargs): + agenda_kwargs = { + 'label': label, + 'slug': slug or slugify(label), + 'kind': kind, + } + agenda_kwargs.update(kwargs) + return Agenda.objects.create(**agenda_kwargs) + + +def build_meetings_agenda( + label='Agenda', + slug=None, + meeting_types: typing.Union[int, tuple[str, int]] = None, + desks: typing.Union[str, dict] = None, + resources=None, + reminder_settings: list = None, + **kwargs, +): + agenda = build_agenda('meetings', label=label, slug=slug, **(kwargs or {})) + + for meeting_type in meeting_types or []: + if isinstance(meeting_type, (int, str)): + meeting_type = {'slug': meeting_type} + add_meeting_type(agenda, **meeting_type) + + if isinstance(desks, tuple): + desks = [desks] + if desks and isinstance(desks, list) and isinstance(desks[0], tuple): + desks = dict(desks) + if isinstance(desks, dict): + desks = [{'label': key, 'timeperiods': value} for key, value in desks.items()] + for desk_kwargs in desks or []: + add_desk(agenda, **desk_kwargs) + for reminder_setting in reminder_settings or []: + AgendaReminderSettings.objects.create(agenda=agenda, **reminder_setting) + for label in resources or []: + slug = slugify(label) + resource, _ = Resource.objects.get_or_create(slug=slugify(label), defaults={'label': label}) + agenda.resources.add(resource) + setattr(agenda, f'_re_{slug.replace("-", "_")}', resource) + return agenda + + +def build_virtual_agenda( + label: str = 'Agenda', + slug: str = None, + agendas: list[Agenda] = None, + meeting_types: list = None, + **kwargs, +): + agenda = build_agenda('virtual', label=label, slug=slug, **(kwargs or {})) + + if isinstance(agendas, dict): + agendas = [{'label': label, **defn} for label, defn in agendas.items()] + + for agenda_defn in agendas or []: + if isinstance(agenda_defn, Agenda): + real_agenda = agenda_defn + else: + agenda_defn['meeting_types'] = agenda_defn.get('meeting_types') or meeting_types + real_agenda = build_meetings_agenda(**agenda_defn) + agenda.real_agendas.add(real_agenda) + setattr(agenda, f'_{real_agenda.slug.replace("-", "_")}', real_agenda) + + for virtual_meeting_type in agenda.iter_meetingtypes(): + virtual_meeting_type.agenda = agenda + setattr(agenda, f'_{virtual_meeting_type.slug.replace("-", "_")}', virtual_meeting_type) + + return agenda + + +def build_unavailability_calendar(label: str, slug: str = None, exceptions=None, exception_sources=None): + slug = slug or slugify(label) + unavailability_calendar, dummy = UnavailabilityCalendar.objects.update_or_create( + slug=slug, defaults={'label': label} + ) + for exception in exceptions or []: + add_exception(unavailability_calendar, **exception) + for exception in exception_sources or []: + add_exception_source(unavailability_calendar, **exception) + return unavailability_calendar + + +def add_unavailability_calendar(desk: Desk, **kwargs): + unavailability_calendar = build_unavailability_calendar(**kwargs) + unavailability_calendar.desks.add(desk) + setattr(desk, f'_{unavailability_calendar.slug.replace("-", "_")}', unavailability_calendar) + return unavailability_calendar + + +def add_desk( + agenda: Agenda, + label: str, + timeperiods=None, + exceptions=None, + exception_sources=None, + unavailability_calendars=None, +): + desk = Desk.objects.create(label=label, agenda=agenda, slug=slugify(label)) + setattr(agenda, f'_{desk.slug.replace("-", "_")}', desk) + if timeperiods: + if not isinstance(timeperiods, list): + timeperiods = [timeperiods] + try: + build_timeperiods(desk, *timeperiods) + except Exception as e: + raise ValueError(f'invalid timeperiods "{e}": {timeperiods}') + for exception in exceptions or []: + add_exception(desk, **exception) + for exception_source in exception_sources or []: + add_exception_source(desk, **exception_source) + for unavailability_calendar in unavailability_calendars or []: + add_unavailability_calendar(desk, **unavailability_calendar) + return desk + + +def add_meeting_type(agenda: Agenda, slug: str, duration: int = None): + if duration is None: + if isinstance(slug, int) or ':' not in slug: + duration = int(slug) + slug = f'mt-{duration}' + else: + slug, duration = slug.split(':') + duration = int(duration) + + meeting_type = MeetingType.objects.create(agenda=agenda, slug=slug, duration=duration) + setattr(agenda, f'_{meeting_type.slug.replace("-", "_")}', meeting_type) + return meeting_type + + +def add_day_timeperiod( + target: typing.Union[Desk, Agenda], + start: typing.Union[datetime.datetime, Interval], + end: typing.Union[datetime.datetime, int] = None, +): + if isinstance(target, Desk): + desk = target + else: + assert target.kind != 'events' + desk = target.desk_set.get() + + if isinstance(start, Interval): + start, end = start + elif isinstance(end, int): + end = start + datetime.timdelta(minutes=end) + + assert localtime(start).date() == localtime(end).date() + + desk.timeperiod_set.create( + date=localtime(start).date(), + start_time=localtime(start).time().replace(tzinfo=None), + end_time=localtime(end).time().replace(tzinfo=None), + ) + + +def add_exception( + target: typing.Union[Desk, Agenda, UnavailabilityCalendar], + start_datetime: typing.Union[datetime.datetime, Interval], + end_datetime: typing.Union[datetime.datetime, int] = None, + label: str = None, +): + desk = None + unavailability_calendar = None + if isinstance(target, Desk): + desk = target + elif isinstance(target, UnavailabilityCalendar): + unavailability_calendar = target + else: + assert target.kind != 'events' + desk = target.desk_set.get() + + if isinstance(start_datetime, Interval): + start_datetime, end_datetime = start_datetime + elif isinstance(end_datetime, int): + end_datetime = start_datetime + datetime.timdelta(minutes=end_datetime) + + timeperiodexception, _ = TimePeriodException.objects.get_or_create( + desk=desk, + unavailability_calendar=unavailability_calendar, + label=label, + start_datetime=start_datetime, + end_datetime=end_datetime, + unavailability_calendar__isnull=True, + group__isnull=True, + ) + if label: + setattr(target, f'_{slugify(label)}', timeperiodexception) + return timeperiodexception + + +def add_exception_source(target: typing.Union[Desk, Agenda], ics_filename: str, ics_file): + desk = None + unavailability_calendar = None + if isinstance(target, Desk): + desk = target + elif isinstance(target, UnavailabilityCalendar): + unavailability_calendar = target + else: + assert target.kind != 'events' + desk = target.desk_set.get() + + if isinstance(ics_file, str): + ics_file = ics_file.encode() + if isinstance(ics_file, bytes): + ics_file = ContentFile(ics_file, name=ics_filename) + + timeperiodexceptionsource = TimePeriodExceptionSource.objects.create( + desk=desk, + unavailability_calendar=unavailability_calendar, + ics_filename=ics_filename, + ics_file=ics_file, + ) + setattr(target, f'_{slugify(ics_filename.replace(".", "_"))}', timeperiodexceptionsource) + return timeperiodexceptionsource + + +def add_meeting( + target: typing.Union[Desk, Agenda], + start_datetime: datetime.datetime, + meeting_type: MeetingType = None, + **kwargs, +): + if isinstance(target, Desk): + desk = target + else: + assert target.kind != 'events' + desk = target.desk_set.get() + + if meeting_type is None: + meeting_type = desk.agenda.meetingtype_set.get() + + event = Event.objects.create( + agenda=desk.agenda, + desk=desk, + meeting_type=meeting_type, + start_datetime=start_datetime, + full=False, + places=1, + ) + booking = Booking.objects.create(event=event, **kwargs) + event.booking = booking + return event + + +PARIS_TZ = zoneinfo.ZoneInfo('Europe/Paris') + +UTC = zoneinfo.ZoneInfo('UTC') + + +def datetime_in_tz(s, tz): + duration = None + if ':' not in s.rsplit(' ')[-1]: + s, duration = s.rsplit(' ', 1) + if duration.endswith('h'): + duration = datetime.timedelta(hours=int(duration[:-1])) + elif duration.endswith('m'): + duration = datetime.timedelta(minutes=int(duration[:-1])) + else: + raise ValueError(f'bad datetime string: {s}') + dt = datetime.datetime.fromisoformat(s) + assert not dt.tzinfo, 's must not contain timezone offset' + dt = dt.replace(tzinfo=tz) + if duration: + return Interval(dt, dt + duration) + return dt + + +def build_agendas(description): + '''Mini-language to build agendas''' + regexp = re.compile(r'(?:\'[^\']+\'|"[^"]+"|[^ \'"]+)+') + call_frame = sys._getframe(1) + description = textwrap.dedent(description) + + class Node(typing.NamedTuple): + tokens: list[str] + nodes: list + line_number: int + + @classmethod + def from_tokens(cls, tokens, line_number): + return cls(tokens=tokens, nodes=[], line_number=line_number) + + class Stack(typing.NamedTuple): + level: int + nodes: list + + nodes = [] + stack = [Stack(0, nodes)] + lines = list(enumerate(description.splitlines(), start=1)) + + def eval_value(value): + return eval(value, call_frame.f_globals, call_frame.f_locals) # pylint: disable=eval-used + + def eval_label(value): + if value[0] in '"\'': + return value[1:-1] + return value + + def syntax_error(msg, line_number=None, node=None): + line_number = line_number or node.line_number + parts = [f'{msg} on line {line_number}'] + parts += [ + f'{"> " if i == line_number else " "}{i}: {line}' + for i, line in lines[max(line_number - 4, 0) : min(line_number + 3, len(lines))] + ] + return SyntaxError('\n'.join(parts)) + + for line_number, line in lines: + indentation = len(line) - len(line.lstrip()) + tokens = [] + for token in regexp.findall(line.strip()): + # comment + if token.startswith('#'): + break + if token: + tokens.append(token) + if not tokens: + continue + level = stack[-1].level + if indentation == stack[-1].level: + pass + elif indentation > level: # indent + stack.append(Stack(level=indentation, nodes=stack[-1].nodes[-1].nodes)) + else: # deindent + while indentation < level: + stack.pop() + level = stack[-1].level + if level != indentation: + raise syntax_error('indentation mismatch', line_number=line_number) + stack[-1].nodes.append(Node.from_tokens(tokens, line_number)) + + def parse_unavailability_calendar(node): + dummy, *rest = node.tokens + try: + label, slug, *rest = rest + kwargs = {'label': label, 'slug': slug} + except ValueError: + (label,) = rest + kwargs = {'label': label} + exceptions = [] + exception_sources = [] + for node in node.nodes: + kind, *dummy = node.tokens + if kind == 'exception': + exceptions.append(parse_exception(node)) + elif kind == 'exception-source': + exception_sources.append(parse_exception_source(node)) + return {**kwargs, 'exceptions': exceptions, 'exception_sources': exception_sources} + + def parse_node(node): + kind, *dummy = node.tokens + try: + if kind == 'meetings': + return build_meetings_agenda(**parse_meetings(node)) + if kind == 'virtual': + return build_virtual_agenda(**parse_virtual(node)) + if kind == 'unavailability-calendar': + return build_unavailability_calendar(**parse_unavailability_calendar(node)) + raise ValueError('unknown node type') + except Exception as e: + raise syntax_error(e, node=node) + + def parse_kwargs(tokens): + kwargs = {} + for token in tokens: + key, value = token.split('=', 1) + kwargs[key] = eval_value(value) + return kwargs + + def parse_agenda_kwargs(tokens): + meeting_types = [] + kwargs = {'meeting_types': meeting_types} + for token in tokens: + if not meeting_types: + if '=' in token: + key, value = token.split('=', 1) + kwargs[key] = eval_value(value) + continue + try: + meeting_types.append(int(token)) + except Exception: + raise ValueError(f'invalid meeting type duration "{token}"') + return kwargs + + def parse_exception(node): + dummy, *rest = node.tokens + kwargs = {} + try: + (label,) = rest + kwargs['label'] = eval_label(rest[0]) + except ValueError: + try: + label, start_datetime = rest + start_datetime = eval_value(start_datetime) + return {'label': label, 'start_datetime': start_datetime} + except ValueError: + label, start_datetime, end_datetime, *rest = rest + start_datetime = eval_value(start_datetime) + end_datetime = eval_value(end_datetime) + return {'label': label, 'start_datetime': start_datetime, 'end_datetime': end_datetime} + else: + for node in node.nodes: + key, *rest = node.tokens + kwargs[key] = eval_value(' '.join(rest)) + return kwargs + + def parse_exception_source(node): + dummy, filename, *rest = node.tokens + return {'ics_filename': filename, 'ics_file': eval_value(' '.join(rest))} + + def parse_desk(node): + kind, label, *inline_timeperiod = node.tokens + if kind != 'desk': + raise syntax_error('invalid desk description', node=node) + timeperiods = [] + exceptions = [] + exception_sources = [] + unavailability_calendars = [] + for node in node.nodes: + kind, *dummy = node.tokens + if kind == 'timeperiod': + timeperiods.append(parse_timeperiod(node)) + elif kind == 'exception': + exceptions.append(parse_exception(node)) + elif kind == 'exception-source': + exception_sources.append(parse_exception_source(node)) + elif kind == 'unavailability-calendar': + unavailability_calendars.append(parse_unavailability_calendar(node)) + else: + raise ValueError('unknown desk child node') + + if inline_timeperiod: + timeperiods.insert(0, inline_timeperiod) + return { + 'label': eval_label(label), + 'timeperiods': timeperiods, + 'exceptions': exceptions, + 'exception_sources': exception_sources, + 'unavailability_calendars': unavailability_calendars, + } + + def parse_timeperiod(node): + kind, *defn = node.tokens + if kind != 'timeperiod': + raise syntax_error('invalid timeperiod description', node=node) + return defn + + def parse_mt(node): + dummy, slug, *rest = node.tokens + if rest: + return {'slug': slug, 'duration': int(rest[0])} + else: + return slug + + def parse_reminder_setting(node): + dummy, *rest = node.tokens + return parse_kwargs(rest) + + def parse_meetings(node): + dummy, label, *rest = node.tokens + kwargs = parse_agenda_kwargs(rest) + desks = [] + meeting_types = kwargs.setdefault('meeting_types', []) + for node in node.nodes: + kind, *rest = node.tokens + if kind == 'desk': + desks.append(parse_desk(node)) + elif kind == 'meeting-type': + meeting_types.append(parse_mt(node)) + elif kind == 'reminder-setting': + kwargs.setdefault('reminder_settings', []).append(parse_reminder_setting(node)) + else: + raise ValueError('unknown meeting children') + return {**kwargs, 'label': eval_label(label), 'desks': desks} + + def parse_virtual(node): + dummy, label, *rest = node.tokens + kwargs = parse_agenda_kwargs(rest) + return {**kwargs, 'label': eval_label(label), 'agendas': list(map(parse_meetings, node.nodes))} + + objects = [parse_node(node) for node in nodes] + if len(objects) > 1: + return objects + else: + return objects[0] + + +def paris(s): + return datetime_in_tz(s, tz=PARIS_TZ) + + +def utc(s): + return datetime_in_tz(s, tz=UTC)