tests: add helper functions to manage meetings agendas (#76335)

This commit is contained in:
Benjamin Dauvergne 2023-03-23 09:35:18 +01:00
parent fd28d075a5
commit 8278e6dca1
3 changed files with 797 additions and 29 deletions

View File

@ -38,6 +38,8 @@ from chrono.agendas.models import (
)
from chrono.utils.timezone import localtime, make_aware, make_naive, now
from .utils import add_meeting, build_agendas, utc
pytestmark = pytest.mark.django_db
ICS_SAMPLE = """BEGIN:VCALENDAR
@ -209,7 +211,7 @@ def test_agenda_minimal_booking_delay(freezer):
def test_agenda_minimal_booking_delay_in_working_days(settings, freezer):
freezer.move_to('2021-07-09')
agenda = Agenda.objects.create(label='Agenda', minimal_booking_delay=1)
agenda = build_agendas('meetings Agenda minimal_booking_delay=1')
settings.WORKING_DAY_CALENDAR = None
agenda.minimal_booking_delay_in_working_days = True
@ -298,7 +300,7 @@ def delay_parameter_to_label(argvalue):
)
def test_agenda_minimal_booking_delay_no_minimal_booking_time(freezer, current_time, min_booking_datetime):
freezer.move_to(current_time)
agenda = Agenda.objects.create(label='Agenda', minimal_booking_delay=4, minimal_booking_time=None)
agenda = build_agendas('meetings Agenda minimal_booking_delay=4 minimal_booking_time=None')
assert make_naive(agenda.min_booking_datetime) == min_booking_datetime
@ -694,13 +696,14 @@ def test_meeting_type_slugs():
def test_timeperiodexception_creation_from_ics():
agenda = Agenda.objects.create(label='Test 1 agenda')
desk = Desk.objects.create(label='Test 1 desk', agenda=agenda)
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE, name='sample.ics')
agenda = build_agendas(
'''\
meetings "Test 1 agenda"
desk "Test 1 desk"
exception-source sample.ics ICS_SAMPLE'''
)
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 2
agenda._test_1_desk._sample_ics.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=agenda._test_1_desk).count() == 2
def test_timeperiodexception_creation_from_ics_without_startdt():
@ -1525,8 +1528,13 @@ def test_desk_duplicate_exception_sources():
}
)
def test_desk_duplicate_exception_source_from_settings():
agenda = Agenda.objects.create(label='Agenda')
desk = Desk.objects.create(label='Desk', agenda=agenda)
agenda = build_agendas(
'''\
meetings Agenda
desk Desk
'''
)
desk = agenda._desk
desk.import_timeperiod_exceptions_from_settings(enable=True)
source = desk.timeperiodexceptionsource_set.get(settings_slug='holidays')
@ -2254,30 +2262,25 @@ def test_agenda_reminders_templated_content(mailoutbox, freezer):
@override_settings(TIME_ZONE='UTC')
def test_agenda_reminders_meetings(mailoutbox, freezer):
freezer.move_to('2020-01-01 11:00')
agenda = Agenda.objects.create(label='Events', kind='meetings')
desk = Desk.objects.create(agenda=agenda, label='Desk')
meetingtype = MeetingType.objects.create(agenda=agenda, label='Bar', duration=30)
TimePeriod.objects.create(
desk=desk, weekday=now().weekday(), start_time=datetime.time(10, 0), end_time=datetime.time(18, 0)
freezer.move_to(utc('2020-01-01 11:00'))
agenda = build_agendas(
'''\
meetings Events
desk Desk
timeperiod wednesday 10:00-18:00
meeting-type Bar 30
reminder-setting days_before_email=2
'''
)
AgendaReminderSettings.objects.create(agenda=agenda, days_before_email=2)
event = Event.objects.create(
agenda=agenda,
places=1,
desk=desk,
meeting_type=meetingtype,
start_datetime=now() + datetime.timedelta(days=5), # 06/01
)
Booking.objects.create(
event=event,
add_meeting(
agenda,
start_datetime=utc('2020-01-06T11:00:00'),
user_email='t@test.org',
user_display_label='Birth certificate',
form_url='publik://default/someform/1/',
)
freezer.move_to('2020-01-04 15:00')
freezer.move_to(utc('2020-01-04 15:00'))
call_command('send_booking_reminders')
assert len(mailoutbox) == 1

View File

@ -2,13 +2,16 @@ import datetime
import json
from unittest import mock
import pytest
from requests.exceptions import ConnectionError
from requests.models import Response
from chrono.agendas.models import Agenda
from chrono.agendas.models import Agenda, TimePeriod
from chrono.utils.date import get_weekday_index
from chrono.utils.lingo import CheckType, get_agenda_check_types
from .utils import build_agendas, build_meetings_agenda, build_virtual_agenda, paris, utc
def test_get_weekday_index():
for date in (
@ -90,3 +93,176 @@ def test_get_agenda_check_types():
CheckType(slug='bar-reason', label='Bar reason', kind='presence'),
CheckType(slug='foo-reason', label='Foo reason', kind='absence'),
]
def test_build_meetings_agenda(db):
agenda = build_meetings_agenda(
meeting_types=[30], resources=['Re1'], desks=('Desk 1', 'monday-friday 09:00-12:00 14:00-17:00')
)
assert agenda.slug == 'agenda'
assert agenda.label == 'Agenda'
assert agenda._mt_30
assert list(agenda.meetingtype_set.all()) == [agenda._mt_30]
assert agenda.desk_set.count() == 1
assert agenda.desk_set.all()[0].slug == 'desk-1'
timeperiods = agenda.desk_set.all()[0].timeperiod_set.all()
assert timeperiods.count() == 10
assert set(timeperiods.values_list('weekday', flat=True)) == set(range(0, 5))
assert set(timeperiods.values_list('start_time', 'end_time')) == {
(datetime.time(9), datetime.time(12)),
(datetime.time(14), datetime.time(17)),
}
assert agenda.resources.count() == 1
assert agenda.resources.get().label == 'Re1'
assert agenda.resources.get().slug == 're1'
def test_build_meetings_agenda_multiple_desks(db):
agenda = build_meetings_agenda(
meeting_types=[30],
desks={
'desk-1': ['monday-friday 09:00-12:00'],
'desk-2': ['monday-friday 14:00-17:00'],
},
)
desks = agenda.desk_set.all()
assert set(desks.values_list('slug', flat=True)) == {'desk-1', 'desk-2'}
assert agenda.desk_set.all()[0].slug == 'desk-1'
timeperiods = TimePeriod.objects.filter(desk__in=desks)
assert timeperiods.count() == 10
assert set(timeperiods.values_list('weekday', flat=True)) == set(range(0, 5))
assert set(timeperiods.values_list('start_time', 'end_time')) == {
(datetime.time(9), datetime.time(12)),
(datetime.time(14), datetime.time(17)),
}
def test_build_virtual_agenda(db):
agenda = build_virtual_agenda(
agendas={
'Agenda 1': {
'desks': ('Bureau 1', 'monday-friday 08:00-12:00 14:00-17:00'),
},
'Agenda 2': {
'desks': ('Bureau 1', 'monday,tuesday 09:00-12:00'),
},
'Agenda 3': {
'desks': ('Bureau 1', 'monday-friday 15:00-17:00'),
},
},
meeting_types=[30],
)
assert agenda._agenda_1
assert agenda._agenda_1._mt_30
assert agenda._agenda_2
assert agenda._agenda_3
assert agenda._mt_30
assert agenda.real_agendas.count() == 3
timeperiods = TimePeriod.objects.filter(desk__agenda__in=agenda.real_agendas.all())
assert timeperiods.count() == 17
assert set(timeperiods.values_list('weekday', flat=True)) == set(range(0, 5))
assert set(timeperiods.values_list('start_time', 'end_time')) == {
(datetime.time(8), datetime.time(12)),
(datetime.time(9), datetime.time(12)),
(datetime.time(14), datetime.time(17)),
(datetime.time(15), datetime.time(17)),
}
assert agenda._agenda_1
def test_build_agendas(db):
# pylint: disable=unused-variable
ICS_SAMPLE = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTAMP:20170824T082855Z
DTSTART:20170831T170800Z
DTEND:20170831T203400Z
SEQUENCE:1
SUMMARY:Événement 1
END:VEVENT
BEGIN:VEVENT
DTSTAMP:20170824T092855Z
DTSTART:20170830T180800Z
DTEND:20170831T223400Z
SEQUENCE:2
END:VEVENT
END:VCALENDAR"""
unavailability_calendar, agenda_1, agenda_2, virtual = build_agendas(
'''
unavailability-calendar Congés
exception Noël
start_datetime paris('2023-12-25T00:00:00 24h')
exception-source sample.ics ICS_SAMPLE
meetings "Agenda 1" maximal_booking_delay=15 30 45 # comment 1
# comment 2
desk "Desk 1"
timeperiod monday-friday 08:00-12:00
exception Grève
start_datetime paris('2023-04-01T01:01:01')
end_datetime paris('2023-04-01T01:01:01')
exception-source sample.ics ICS_SAMPLE
unavailability-calendar Congés
desk "Bureau 2"
timeperiod monday-friday 14:00-17:00
meetings 'Agenda 2' 30
desk "Desk 1" monday-friday 08:00-12:00
desk "Desk 2" monday,friday 14:00-17:00
virtual "Agenda 3" 30
meetings CNI
desk bureau1 monday-friday 10:00-12:00
meetings Passeport
desk bureau2 monday-friday 14:00-17:00
'''
)
assert unavailability_calendar.label == 'Congés'
assert unavailability_calendar._noel
assert unavailability_calendar._sample_ics
assert agenda_1.label == 'Agenda 1'
assert agenda_1.maximal_booking_delay == 15
assert agenda_1._desk_1._greve
assert agenda_1._desk_1._sample_ics
assert agenda_1._desk_1
assert agenda_1._desk_1._conges == unavailability_calendar
assert agenda_1._bureau_2
assert agenda_1._mt_30
assert agenda_1._mt_45
assert agenda_2.label == 'Agenda 2'
assert agenda_2._desk_1
assert agenda_2._desk_2
assert agenda_2._mt_30
assert virtual._cni._bureau1
assert virtual._cni._mt_30
assert virtual._mt_30
assert virtual._passeport._bureau2
assert virtual._passeport._mt_30
def test_build_agendas_indentation_mismatch(db):
with pytest.raises(SyntaxError, match=r'on line 5'):
build_agendas(
'''
agenda xxx
zobi dd
kkk
iii # here, bad indentation
agenda "Agenda 1" 30
desk 1
desk 2
'''
)
def test_paris():
assert paris('2023-04-19T11:00:00').isoformat() == '2023-04-19T11:00:00+02:00'
def test_utc():
assert utc('2023-04-19T11:00:00').isoformat() == '2023-04-19T11:00:00+00:00'

View File

@ -1,3 +1,46 @@
# chrono - agendas system
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import re
import sys
import textwrap
import typing
import zoneinfo
from django.core.files.base import ContentFile
from django.utils.text import slugify
from chrono.agendas.models import (
Agenda,
AgendaReminderSettings,
Booking,
Desk,
Event,
MeetingType,
Resource,
TimePeriod,
TimePeriodException,
TimePeriodExceptionSource,
UnavailabilityCalendar,
)
from chrono.utils.interval import Interval
from chrono.utils.timezone import localtime
def login(app, username='admin', password='admin'):
login_page = app.get('/login/')
login_form = login_page.forms[0]
@ -6,3 +49,549 @@ def login(app, username='admin', password='admin'):
resp = login_form.submit()
assert resp.status_int == 302
return app
def build_timeperiods(desk, *timeperiods):
'''Build timeperiods for a desk. timperiods is a list of string with
format:
monday-friday;9:00-12:00,14:00-17:00
saturday;9:00-12:00
'''
for defn in timeperiods:
if isinstance(defn, str):
defn = defn.strip()
parts = defn.split()
else:
parts = defn
days = []
openings = []
first_opening = ([i for i, part in enumerate(parts) if part[:1].isdigit()] or [len(parts)])[0]
days = parts[:first_opening]
openings = parts[first_opening:]
assert days and openings, defn
for day in days:
day_to_weekday = {
'monday': 0,
'tuesday': 1,
'wednesday': 2,
'thursday': 3,
'friday': 4,
'saturday': 5,
'sunday': 6,
}
if '-' in day:
begin, end = day.split('-')
begin_weekday = day_to_weekday[begin]
end_weekday = day_to_weekday[end]
assert begin_weekday < end_weekday, f'{begin} must be before {end}: {day}'
weekdays = range(begin_weekday, end_weekday + 1)
else:
weekdays = [datetime.datetime.strptime(d, '%A').weekday() for d in day.split(',')]
for weekday in weekdays:
for opening in openings:
start, end = opening.split('-')
start_time = datetime.datetime.strptime(start, '%H:%M').time()
end_time = datetime.datetime.strptime(end, '%H:%M').time()
TimePeriod.objects.create(
desk=desk, weekday=weekday, start_time=start_time, end_time=end_time
)
def build_agenda(kind, label='Agenda', slug=None, **kwargs):
agenda_kwargs = {
'label': label,
'slug': slug or slugify(label),
'kind': kind,
}
agenda_kwargs.update(kwargs)
return Agenda.objects.create(**agenda_kwargs)
def build_meetings_agenda(
label='Agenda',
slug=None,
meeting_types: typing.Union[int, tuple[str, int]] = None,
desks: typing.Union[str, dict] = None,
resources=None,
reminder_settings: list = None,
**kwargs,
):
agenda = build_agenda('meetings', label=label, slug=slug, **(kwargs or {}))
for meeting_type in meeting_types or []:
if isinstance(meeting_type, (int, str)):
meeting_type = {'slug': meeting_type}
add_meeting_type(agenda, **meeting_type)
if isinstance(desks, tuple):
desks = [desks]
if desks and isinstance(desks, list) and isinstance(desks[0], tuple):
desks = dict(desks)
if isinstance(desks, dict):
desks = [{'label': key, 'timeperiods': value} for key, value in desks.items()]
for desk_kwargs in desks or []:
add_desk(agenda, **desk_kwargs)
for reminder_setting in reminder_settings or []:
AgendaReminderSettings.objects.create(agenda=agenda, **reminder_setting)
for label in resources or []:
slug = slugify(label)
resource, _ = Resource.objects.get_or_create(slug=slugify(label), defaults={'label': label})
agenda.resources.add(resource)
setattr(agenda, f'_re_{slug.replace("-", "_")}', resource)
return agenda
def build_virtual_agenda(
label: str = 'Agenda',
slug: str = None,
agendas: list[Agenda] = None,
meeting_types: list = None,
**kwargs,
):
agenda = build_agenda('virtual', label=label, slug=slug, **(kwargs or {}))
if isinstance(agendas, dict):
agendas = [{'label': label, **defn} for label, defn in agendas.items()]
for agenda_defn in agendas or []:
if isinstance(agenda_defn, Agenda):
real_agenda = agenda_defn
else:
agenda_defn['meeting_types'] = agenda_defn.get('meeting_types') or meeting_types
real_agenda = build_meetings_agenda(**agenda_defn)
agenda.real_agendas.add(real_agenda)
setattr(agenda, f'_{real_agenda.slug.replace("-", "_")}', real_agenda)
for virtual_meeting_type in agenda.iter_meetingtypes():
virtual_meeting_type.agenda = agenda
setattr(agenda, f'_{virtual_meeting_type.slug.replace("-", "_")}', virtual_meeting_type)
return agenda
def build_unavailability_calendar(label: str, slug: str = None, exceptions=None, exception_sources=None):
slug = slug or slugify(label)
unavailability_calendar, dummy = UnavailabilityCalendar.objects.update_or_create(
slug=slug, defaults={'label': label}
)
for exception in exceptions or []:
add_exception(unavailability_calendar, **exception)
for exception in exception_sources or []:
add_exception_source(unavailability_calendar, **exception)
return unavailability_calendar
def add_unavailability_calendar(desk: Desk, **kwargs):
unavailability_calendar = build_unavailability_calendar(**kwargs)
unavailability_calendar.desks.add(desk)
setattr(desk, f'_{unavailability_calendar.slug.replace("-", "_")}', unavailability_calendar)
return unavailability_calendar
def add_desk(
agenda: Agenda,
label: str,
timeperiods=None,
exceptions=None,
exception_sources=None,
unavailability_calendars=None,
):
desk = Desk.objects.create(label=label, agenda=agenda, slug=slugify(label))
setattr(agenda, f'_{desk.slug.replace("-", "_")}', desk)
if timeperiods:
if not isinstance(timeperiods, list):
timeperiods = [timeperiods]
try:
build_timeperiods(desk, *timeperiods)
except Exception as e:
raise ValueError(f'invalid timeperiods "{e}": {timeperiods}')
for exception in exceptions or []:
add_exception(desk, **exception)
for exception_source in exception_sources or []:
add_exception_source(desk, **exception_source)
for unavailability_calendar in unavailability_calendars or []:
add_unavailability_calendar(desk, **unavailability_calendar)
return desk
def add_meeting_type(agenda: Agenda, slug: str, duration: int = None):
if duration is None:
if isinstance(slug, int) or ':' not in slug:
duration = int(slug)
slug = f'mt-{duration}'
else:
slug, duration = slug.split(':')
duration = int(duration)
meeting_type = MeetingType.objects.create(agenda=agenda, slug=slug, duration=duration)
setattr(agenda, f'_{meeting_type.slug.replace("-", "_")}', meeting_type)
return meeting_type
def add_day_timeperiod(
target: typing.Union[Desk, Agenda],
start: typing.Union[datetime.datetime, Interval],
end: typing.Union[datetime.datetime, int] = None,
):
if isinstance(target, Desk):
desk = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if isinstance(start, Interval):
start, end = start
elif isinstance(end, int):
end = start + datetime.timdelta(minutes=end)
assert localtime(start).date() == localtime(end).date()
desk.timeperiod_set.create(
date=localtime(start).date(),
start_time=localtime(start).time().replace(tzinfo=None),
end_time=localtime(end).time().replace(tzinfo=None),
)
def add_exception(
target: typing.Union[Desk, Agenda, UnavailabilityCalendar],
start_datetime: typing.Union[datetime.datetime, Interval],
end_datetime: typing.Union[datetime.datetime, int] = None,
label: str = None,
):
desk = None
unavailability_calendar = None
if isinstance(target, Desk):
desk = target
elif isinstance(target, UnavailabilityCalendar):
unavailability_calendar = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if isinstance(start_datetime, Interval):
start_datetime, end_datetime = start_datetime
elif isinstance(end_datetime, int):
end_datetime = start_datetime + datetime.timdelta(minutes=end_datetime)
timeperiodexception, _ = TimePeriodException.objects.get_or_create(
desk=desk,
unavailability_calendar=unavailability_calendar,
label=label,
start_datetime=start_datetime,
end_datetime=end_datetime,
unavailability_calendar__isnull=True,
group__isnull=True,
)
if label:
setattr(target, f'_{slugify(label)}', timeperiodexception)
return timeperiodexception
def add_exception_source(target: typing.Union[Desk, Agenda], ics_filename: str, ics_file):
desk = None
unavailability_calendar = None
if isinstance(target, Desk):
desk = target
elif isinstance(target, UnavailabilityCalendar):
unavailability_calendar = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if isinstance(ics_file, str):
ics_file = ics_file.encode()
if isinstance(ics_file, bytes):
ics_file = ContentFile(ics_file, name=ics_filename)
timeperiodexceptionsource = TimePeriodExceptionSource.objects.create(
desk=desk,
unavailability_calendar=unavailability_calendar,
ics_filename=ics_filename,
ics_file=ics_file,
)
setattr(target, f'_{slugify(ics_filename.replace(".", "_"))}', timeperiodexceptionsource)
return timeperiodexceptionsource
def add_meeting(
target: typing.Union[Desk, Agenda],
start_datetime: datetime.datetime,
meeting_type: MeetingType = None,
**kwargs,
):
if isinstance(target, Desk):
desk = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if meeting_type is None:
meeting_type = desk.agenda.meetingtype_set.get()
event = Event.objects.create(
agenda=desk.agenda,
desk=desk,
meeting_type=meeting_type,
start_datetime=start_datetime,
full=False,
places=1,
)
booking = Booking.objects.create(event=event, **kwargs)
event.booking = booking
return event
PARIS_TZ = zoneinfo.ZoneInfo('Europe/Paris')
UTC = zoneinfo.ZoneInfo('UTC')
def datetime_in_tz(s, tz):
duration = None
if ':' not in s.rsplit(' ')[-1]:
s, duration = s.rsplit(' ', 1)
if duration.endswith('h'):
duration = datetime.timedelta(hours=int(duration[:-1]))
elif duration.endswith('m'):
duration = datetime.timedelta(minutes=int(duration[:-1]))
else:
raise ValueError(f'bad datetime string: {s}')
dt = datetime.datetime.fromisoformat(s)
assert not dt.tzinfo, 's must not contain timezone offset'
dt = dt.replace(tzinfo=tz)
if duration:
return Interval(dt, dt + duration)
return dt
def build_agendas(description):
'''Mini-language to build agendas'''
regexp = re.compile(r'(?:\'[^\']+\'|"[^"]+"|[^ \'"]+)+')
call_frame = sys._getframe(1)
description = textwrap.dedent(description)
class Node(typing.NamedTuple):
tokens: list[str]
nodes: list
line_number: int
@classmethod
def from_tokens(cls, tokens, line_number):
return cls(tokens=tokens, nodes=[], line_number=line_number)
class Stack(typing.NamedTuple):
level: int
nodes: list
nodes = []
stack = [Stack(0, nodes)]
lines = list(enumerate(description.splitlines(), start=1))
def eval_value(value):
return eval(value, call_frame.f_globals, call_frame.f_locals) # pylint: disable=eval-used
def eval_label(value):
if value[0] in '"\'':
return value[1:-1]
return value
def syntax_error(msg, line_number=None, node=None):
line_number = line_number or node.line_number
parts = [f'{msg} on line {line_number}']
parts += [
f'{"> " if i == line_number else " "}{i}: {line}'
for i, line in lines[max(line_number - 4, 0) : min(line_number + 3, len(lines))]
]
return SyntaxError('\n'.join(parts))
for line_number, line in lines:
indentation = len(line) - len(line.lstrip())
tokens = []
for token in regexp.findall(line.strip()):
# comment
if token.startswith('#'):
break
if token:
tokens.append(token)
if not tokens:
continue
level = stack[-1].level
if indentation == stack[-1].level:
pass
elif indentation > level: # indent
stack.append(Stack(level=indentation, nodes=stack[-1].nodes[-1].nodes))
else: # deindent
while indentation < level:
stack.pop()
level = stack[-1].level
if level != indentation:
raise syntax_error('indentation mismatch', line_number=line_number)
stack[-1].nodes.append(Node.from_tokens(tokens, line_number))
def parse_unavailability_calendar(node):
dummy, *rest = node.tokens
try:
label, slug, *rest = rest
kwargs = {'label': label, 'slug': slug}
except ValueError:
(label,) = rest
kwargs = {'label': label}
exceptions = []
exception_sources = []
for node in node.nodes:
kind, *dummy = node.tokens
if kind == 'exception':
exceptions.append(parse_exception(node))
elif kind == 'exception-source':
exception_sources.append(parse_exception_source(node))
return {**kwargs, 'exceptions': exceptions, 'exception_sources': exception_sources}
def parse_node(node):
kind, *dummy = node.tokens
try:
if kind == 'meetings':
return build_meetings_agenda(**parse_meetings(node))
if kind == 'virtual':
return build_virtual_agenda(**parse_virtual(node))
if kind == 'unavailability-calendar':
return build_unavailability_calendar(**parse_unavailability_calendar(node))
raise ValueError('unknown node type')
except Exception as e:
raise syntax_error(e, node=node)
def parse_kwargs(tokens):
kwargs = {}
for token in tokens:
key, value = token.split('=', 1)
kwargs[key] = eval_value(value)
return kwargs
def parse_agenda_kwargs(tokens):
meeting_types = []
kwargs = {'meeting_types': meeting_types}
for token in tokens:
if not meeting_types:
if '=' in token:
key, value = token.split('=', 1)
kwargs[key] = eval_value(value)
continue
try:
meeting_types.append(int(token))
except Exception:
raise ValueError(f'invalid meeting type duration "{token}"')
return kwargs
def parse_exception(node):
dummy, *rest = node.tokens
kwargs = {}
try:
(label,) = rest
kwargs['label'] = eval_label(rest[0])
except ValueError:
try:
label, start_datetime = rest
start_datetime = eval_value(start_datetime)
return {'label': label, 'start_datetime': start_datetime}
except ValueError:
label, start_datetime, end_datetime, *rest = rest
start_datetime = eval_value(start_datetime)
end_datetime = eval_value(end_datetime)
return {'label': label, 'start_datetime': start_datetime, 'end_datetime': end_datetime}
else:
for node in node.nodes:
key, *rest = node.tokens
kwargs[key] = eval_value(' '.join(rest))
return kwargs
def parse_exception_source(node):
dummy, filename, *rest = node.tokens
return {'ics_filename': filename, 'ics_file': eval_value(' '.join(rest))}
def parse_desk(node):
kind, label, *inline_timeperiod = node.tokens
if kind != 'desk':
raise syntax_error('invalid desk description', node=node)
timeperiods = []
exceptions = []
exception_sources = []
unavailability_calendars = []
for node in node.nodes:
kind, *dummy = node.tokens
if kind == 'timeperiod':
timeperiods.append(parse_timeperiod(node))
elif kind == 'exception':
exceptions.append(parse_exception(node))
elif kind == 'exception-source':
exception_sources.append(parse_exception_source(node))
elif kind == 'unavailability-calendar':
unavailability_calendars.append(parse_unavailability_calendar(node))
else:
raise ValueError('unknown desk child node')
if inline_timeperiod:
timeperiods.insert(0, inline_timeperiod)
return {
'label': eval_label(label),
'timeperiods': timeperiods,
'exceptions': exceptions,
'exception_sources': exception_sources,
'unavailability_calendars': unavailability_calendars,
}
def parse_timeperiod(node):
kind, *defn = node.tokens
if kind != 'timeperiod':
raise syntax_error('invalid timeperiod description', node=node)
return defn
def parse_mt(node):
dummy, slug, *rest = node.tokens
if rest:
return {'slug': slug, 'duration': int(rest[0])}
else:
return slug
def parse_reminder_setting(node):
dummy, *rest = node.tokens
return parse_kwargs(rest)
def parse_meetings(node):
dummy, label, *rest = node.tokens
kwargs = parse_agenda_kwargs(rest)
desks = []
meeting_types = kwargs.setdefault('meeting_types', [])
for node in node.nodes:
kind, *rest = node.tokens
if kind == 'desk':
desks.append(parse_desk(node))
elif kind == 'meeting-type':
meeting_types.append(parse_mt(node))
elif kind == 'reminder-setting':
kwargs.setdefault('reminder_settings', []).append(parse_reminder_setting(node))
else:
raise ValueError('unknown meeting children')
return {**kwargs, 'label': eval_label(label), 'desks': desks}
def parse_virtual(node):
dummy, label, *rest = node.tokens
kwargs = parse_agenda_kwargs(rest)
return {**kwargs, 'label': eval_label(label), 'agendas': list(map(parse_meetings, node.nodes))}
objects = [parse_node(node) for node in nodes]
if len(objects) > 1:
return objects
else:
return objects[0]
def paris(s):
return datetime_in_tz(s, tz=PARIS_TZ)
def utc(s):
return datetime_in_tz(s, tz=UTC)