# chrono - agendas system # Copyright (C) 2023 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import datetime import re import sys import textwrap import typing import zoneinfo from django.core.files.base import ContentFile from django.utils.text import slugify from chrono.agendas.models import ( Agenda, AgendaReminderSettings, Booking, Desk, Event, MeetingType, Resource, TimePeriod, TimePeriodException, TimePeriodExceptionSource, UnavailabilityCalendar, ) from chrono.utils.interval import Interval from chrono.utils.timezone import localtime def login(app, username='admin', password='admin'): login_page = app.get('/login/') login_form = login_page.forms[0] login_form['username'] = username login_form['password'] = password resp = login_form.submit() assert resp.status_int == 302 return app def build_timeperiods(desk, *timeperiods): '''Build timeperiods for a desk. timperiods is a list of string with format: monday-friday;9:00-12:00,14:00-17:00 saturday;9:00-12:00 ''' for defn in timeperiods: if isinstance(defn, str): defn = defn.strip() parts = defn.split() else: parts = defn days = [] openings = [] first_opening = ([i for i, part in enumerate(parts) if part[:1].isdigit()] or [len(parts)])[0] days = parts[:first_opening] openings = parts[first_opening:] assert days and openings, defn for day in days: day_to_weekday = { 'monday': 0, 'tuesday': 1, 'wednesday': 2, 'thursday': 3, 'friday': 4, 'saturday': 5, 'sunday': 6, } if '-' in day: begin, end = day.split('-') begin_weekday = day_to_weekday[begin] end_weekday = day_to_weekday[end] assert begin_weekday < end_weekday, f'{begin} must be before {end}: {day}' weekdays = range(begin_weekday, end_weekday + 1) else: weekdays = [datetime.datetime.strptime(d, '%A').weekday() for d in day.split(',')] for weekday in weekdays: for opening in openings: start, end = opening.split('-') start_time = datetime.datetime.strptime(start, '%H:%M').time() end_time = datetime.datetime.strptime(end, '%H:%M').time() TimePeriod.objects.create( desk=desk, weekday=weekday, start_time=start_time, end_time=end_time ) def build_agenda(kind, label='Agenda', slug=None, **kwargs): agenda_kwargs = { 'label': label, 'slug': slug or slugify(label), 'kind': kind, } agenda_kwargs.update(kwargs) return Agenda.objects.create(**agenda_kwargs) def build_event_agenda(label='Agenda', slug=None, events=None, **kwargs): agenda = build_agenda('events', label=label, slug=slug, **(kwargs or {})) for label, event_defn in (events or {}).items(): event = Event.objects.create(agenda=agenda, label=label, **event_defn) setattr(agenda, f'_{event.slug.replace("-", "_")}', event) return agenda def build_meetings_agenda( label='Agenda', slug=None, meeting_types: typing.Union[int, tuple[str, int]] = None, desks: typing.Union[str, dict] = None, resources=None, reminder_settings: list = None, **kwargs, ): agenda = build_agenda('meetings', label=label, slug=slug, **(kwargs or {})) for meeting_type in meeting_types or []: if isinstance(meeting_type, (int, str)): meeting_type = {'slug': meeting_type} add_meeting_type(agenda, **meeting_type) if isinstance(desks, tuple): desks = [desks] if desks and isinstance(desks, list) and isinstance(desks[0], tuple): desks = dict(desks) if isinstance(desks, dict): desks = [{'label': key, 'timeperiods': value} for key, value in desks.items()] for desk_kwargs in desks or []: add_desk(agenda, **desk_kwargs) for reminder_setting in reminder_settings or []: AgendaReminderSettings.objects.create(agenda=agenda, **reminder_setting) for label in resources or []: slug = slugify(label) resource, _ = Resource.objects.get_or_create(slug=slugify(label), defaults={'label': label}) agenda.resources.add(resource) setattr(agenda, f'_re_{slug.replace("-", "_")}', resource) return agenda def build_virtual_agenda( label: str = 'Agenda', slug: str = None, agendas: list[Agenda] = None, meeting_types: list = None, **kwargs, ): agenda = build_agenda('virtual', label=label, slug=slug, **(kwargs or {})) if isinstance(agendas, dict): agendas = [{'label': label, **defn} for label, defn in agendas.items()] for agenda_defn in agendas or []: if isinstance(agenda_defn, Agenda): real_agenda = agenda_defn else: agenda_defn['meeting_types'] = agenda_defn.get('meeting_types') or meeting_types real_agenda = build_meetings_agenda(**agenda_defn) agenda.real_agendas.add(real_agenda) setattr(agenda, f'_{real_agenda.slug.replace("-", "_")}', real_agenda) for virtual_meeting_type in agenda.iter_meetingtypes(): virtual_meeting_type.agenda = agenda setattr(agenda, f'_{virtual_meeting_type.slug.replace("-", "_")}', virtual_meeting_type) return agenda def build_unavailability_calendar(label: str, slug: str = None, exceptions=None, exception_sources=None): slug = slug or slugify(label) unavailability_calendar, dummy = UnavailabilityCalendar.objects.update_or_create( slug=slug, defaults={'label': label} ) for exception in exceptions or []: add_exception(unavailability_calendar, **exception) for exception in exception_sources or []: add_exception_source(unavailability_calendar, **exception) return unavailability_calendar def add_unavailability_calendar(desk: Desk, **kwargs): unavailability_calendar = build_unavailability_calendar(**kwargs) unavailability_calendar.desks.add(desk) setattr(desk, f'_{unavailability_calendar.slug.replace("-", "_")}', unavailability_calendar) return unavailability_calendar def add_desk( agenda: Agenda, label: str, timeperiods=None, exceptions=None, exception_sources=None, unavailability_calendars=None, ): desk = Desk.objects.create(label=label, agenda=agenda, slug=slugify(label)) setattr(agenda, f'_{desk.slug.replace("-", "_")}', desk) if timeperiods: if not isinstance(timeperiods, list): timeperiods = [timeperiods] try: build_timeperiods(desk, *timeperiods) except Exception as e: raise ValueError(f'invalid timeperiods "{e}": {timeperiods}') for exception in exceptions or []: add_exception(desk, **exception) for exception_source in exception_sources or []: add_exception_source(desk, **exception_source) for unavailability_calendar in unavailability_calendars or []: add_unavailability_calendar(desk, **unavailability_calendar) return desk def add_meeting_type(agenda: Agenda, slug: str, label: str = None, duration: int = None): if duration is None: if isinstance(slug, int) or ':' not in slug: duration = int(slug) slug = f'mt-{duration}' else: slug, duration = slug.split(':') duration = int(duration) kwargs = {'agenda': agenda, 'slug': slug, 'duration': duration} if label: kwargs['label'] = label meeting_type = MeetingType.objects.create(**kwargs) setattr(agenda, f'_{meeting_type.slug.replace("-", "_")}', meeting_type) return meeting_type def add_day_timeperiod( target: typing.Union[Desk, Agenda], start: typing.Union[datetime.datetime, Interval], end: typing.Union[datetime.datetime, int] = None, ): if isinstance(target, Desk): desk = target else: assert target.kind != 'events' desk = target.desk_set.get() if isinstance(start, Interval): start, end = start elif isinstance(end, int): end = start + datetime.timdelta(minutes=end) assert localtime(start).date() == localtime(end).date() desk.timeperiod_set.create( date=localtime(start).date(), start_time=localtime(start).time().replace(tzinfo=None), end_time=localtime(end).time().replace(tzinfo=None), ) def add_exception( target: typing.Union[Desk, Agenda, UnavailabilityCalendar], start_datetime: typing.Union[datetime.datetime, Interval], end_datetime: typing.Union[datetime.datetime, int] = None, label: str = None, ): desk = None unavailability_calendar = None if isinstance(target, Desk): desk = target elif isinstance(target, UnavailabilityCalendar): unavailability_calendar = target else: assert target.kind != 'events' desk = target.desk_set.get() if isinstance(start_datetime, Interval): start_datetime, end_datetime = start_datetime elif isinstance(end_datetime, int): end_datetime = start_datetime + datetime.timdelta(minutes=end_datetime) timeperiodexception, _ = TimePeriodException.objects.get_or_create( desk=desk, unavailability_calendar=unavailability_calendar, label=label, start_datetime=start_datetime, end_datetime=end_datetime, unavailability_calendar__isnull=True, group__isnull=True, ) if label: setattr(target, f'_{slugify(label)}', timeperiodexception) return timeperiodexception def add_exception_source(target: typing.Union[Desk, Agenda], ics_filename: str, ics_file): desk = None unavailability_calendar = None if isinstance(target, Desk): desk = target elif isinstance(target, UnavailabilityCalendar): unavailability_calendar = target else: assert target.kind != 'events' desk = target.desk_set.get() if isinstance(ics_file, str): ics_file = ics_file.encode() if isinstance(ics_file, bytes): ics_file = ContentFile(ics_file, name=ics_filename) timeperiodexceptionsource = TimePeriodExceptionSource.objects.create( desk=desk, unavailability_calendar=unavailability_calendar, ics_filename=ics_filename, ics_file=ics_file, ) setattr(target, f'_{slugify(ics_filename.replace(".", "_"))}', timeperiodexceptionsource) return timeperiodexceptionsource def add_meeting( target: typing.Union[Desk, Agenda], start_datetime: datetime.datetime, meeting_type: typing.Union[str, MeetingType] = None, **kwargs, ): if isinstance(target, Desk): desk = target else: assert target.kind != 'events' desk = target.desk_set.get() if meeting_type is None: meeting_type = desk.agenda.meetingtype_set.get() elif isinstance(meeting_type, str): meeting_type = desk.agenda.meetingtype_set.get(slug=meeting_type) event = Event.objects.create( agenda=desk.agenda, desk=desk, meeting_type=meeting_type, start_datetime=start_datetime, full=False, places=1, ) booking = Booking.objects.create(event=event, **kwargs) event.booking = booking return event PARIS_TZ = zoneinfo.ZoneInfo('Europe/Paris') UTC = zoneinfo.ZoneInfo('UTC') def datetime_in_tz(s, tz): duration = None if ':' not in s.rsplit(' ')[-1]: s, duration = s.rsplit(' ', 1) if duration.endswith('h'): duration = datetime.timedelta(hours=int(duration[:-1])) elif duration.endswith('m'): duration = datetime.timedelta(minutes=int(duration[:-1])) else: raise ValueError(f'bad datetime string: {s}') dt = datetime.datetime.fromisoformat(s) assert not dt.tzinfo, 's must not contain timezone offset' dt = dt.replace(tzinfo=tz) if duration: return Interval(dt, dt + duration) return dt def build_agendas(description): '''Mini-language to build agendas''' regexp = re.compile(r'(?:\'[^\']+\'|"[^"]+"|[^ \'"]+)+') call_frame = sys._getframe(1) description = textwrap.dedent(description) class Node(typing.NamedTuple): tokens: list[str] nodes: list line_number: int @classmethod def from_tokens(cls, tokens, line_number): return cls(tokens=tokens, nodes=[], line_number=line_number) class Stack(typing.NamedTuple): level: int nodes: list nodes = [] stack = [Stack(0, nodes)] lines = list(enumerate(description.splitlines(), start=1)) def eval_value(value): return eval(value, call_frame.f_globals, call_frame.f_locals) # pylint: disable=eval-used def eval_label(value): if value[0] in '"\'': return value[1:-1] return value def syntax_error(msg, line_number=None, node=None): line_number = line_number or node.line_number parts = [f'{msg} on line {line_number}'] parts += [ f'{"> " if i == line_number else " "}{i}: {line}' for i, line in lines[max(line_number - 4, 0) : min(line_number + 3, len(lines))] ] return SyntaxError('\n'.join(parts)) for line_number, line in lines: indentation = len(line) - len(line.lstrip()) tokens = [] for token in regexp.findall(line.strip()): # comment if token.startswith('#'): break if token: tokens.append(token) if not tokens: continue level = stack[-1].level if indentation == stack[-1].level: pass elif indentation > level: # indent stack.append(Stack(level=indentation, nodes=stack[-1].nodes[-1].nodes)) else: # deindent while indentation < level: stack.pop() level = stack[-1].level if level != indentation: raise syntax_error('indentation mismatch', line_number=line_number) stack[-1].nodes.append(Node.from_tokens(tokens, line_number)) def parse_unavailability_calendar(node): dummy, *rest = node.tokens try: label, slug, *rest = rest kwargs = {'label': label, 'slug': slug} except ValueError: (label,) = rest kwargs = {'label': label} exceptions = [] exception_sources = [] for node in node.nodes: kind, *dummy = node.tokens if kind == 'exception': exceptions.append(parse_exception(node)) elif kind == 'exception-source': exception_sources.append(parse_exception_source(node)) return {**kwargs, 'exceptions': exceptions, 'exception_sources': exception_sources} def parse_node(node): kind, *dummy = node.tokens try: if kind == 'meetings': return build_meetings_agenda(**parse_meetings(node)) if kind == 'virtual': return build_virtual_agenda(**parse_virtual(node)) if kind == 'unavailability-calendar': return build_unavailability_calendar(**parse_unavailability_calendar(node)) raise ValueError('unknown node type') except Exception as e: raise syntax_error(e, node=node) def parse_kwargs(tokens): kwargs = {} for token in tokens: key, value = token.split('=', 1) kwargs[key] = eval_value(value) return kwargs def parse_agenda_kwargs(tokens): meeting_types = [] kwargs = {'meeting_types': meeting_types} for token in tokens: if not meeting_types: if '=' in token: key, value = token.split('=', 1) kwargs[key] = eval_value(value) continue try: meeting_types.append(int(token)) except Exception: raise ValueError(f'invalid meeting type duration "{token}"') return kwargs def parse_exception(node): dummy, *rest = node.tokens kwargs = {} try: (label,) = rest kwargs['label'] = eval_label(rest[0]) except ValueError: try: label, start_datetime = rest start_datetime = eval_value(start_datetime) return {'label': label, 'start_datetime': start_datetime} except ValueError: label, start_datetime, end_datetime, *rest = rest start_datetime = eval_value(start_datetime) end_datetime = eval_value(end_datetime) return {'label': label, 'start_datetime': start_datetime, 'end_datetime': end_datetime} else: for node in node.nodes: key, *rest = node.tokens kwargs[key] = eval_value(' '.join(rest)) return kwargs def parse_exception_source(node): dummy, filename, *rest = node.tokens return {'ics_filename': filename, 'ics_file': eval_value(' '.join(rest))} def parse_desk(node): kind, label, *inline_timeperiod = node.tokens if kind != 'desk': raise syntax_error('invalid desk description', node=node) timeperiods = [] exceptions = [] exception_sources = [] unavailability_calendars = [] for node in node.nodes: kind, *dummy = node.tokens if kind == 'timeperiod': timeperiods.append(parse_timeperiod(node)) elif kind == 'exception': exceptions.append(parse_exception(node)) elif kind == 'exception-source': exception_sources.append(parse_exception_source(node)) elif kind == 'unavailability-calendar': unavailability_calendars.append(parse_unavailability_calendar(node)) else: raise ValueError('unknown desk child node') if inline_timeperiod: timeperiods.insert(0, inline_timeperiod) return { 'label': eval_label(label), 'timeperiods': timeperiods, 'exceptions': exceptions, 'exception_sources': exception_sources, 'unavailability_calendars': unavailability_calendars, } def parse_timeperiod(node): kind, *defn = node.tokens if kind != 'timeperiod': raise syntax_error('invalid timeperiod description', node=node) return defn def parse_mt(node): dummy, *rest = node.tokens if len(rest) == 1: return rest[0] elif len(rest) == 2: return {'slug': rest[0], 'duration': int(rest[1])} else: return {'label': eval_label(rest[0]), 'slug': rest[1], 'duration': int(rest[2])} def parse_reminder_setting(node): dummy, *rest = node.tokens return parse_kwargs(rest) def parse_meetings(node): dummy, label, *rest = node.tokens kwargs = parse_agenda_kwargs(rest) desks = [] meeting_types = kwargs.setdefault('meeting_types', []) for node in node.nodes: kind, *rest = node.tokens if kind == 'desk': desks.append(parse_desk(node)) elif kind == 'meeting-type': meeting_types.append(parse_mt(node)) elif kind == 'reminder-setting': kwargs.setdefault('reminder_settings', []).append(parse_reminder_setting(node)) else: raise ValueError('unknown meeting children') return {**kwargs, 'label': eval_label(label), 'desks': desks} def parse_virtual(node): dummy, label, *rest = node.tokens kwargs = parse_agenda_kwargs(rest) return {**kwargs, 'label': eval_label(label), 'agendas': list(map(parse_meetings, node.nodes))} objects = [parse_node(node) for node in nodes] if len(objects) > 1: return objects else: return objects[0] def paris(s): return datetime_in_tz(s, tz=PARIS_TZ) def utc(s): return datetime_in_tz(s, tz=UTC)