Compare commits

...

6 Commits

Author SHA1 Message Date
Emmanuel Cazenave 2f48b1057f wip 2023-05-15 15:36:25 +02:00
Benjamin Dauvergne 8748be2b22 agendas: implements free time calculation (#76335)
gitea/chrono/pipeline/head This commit looks good Details
SharedTimePeriod gets a get_intervals(mintime, maxtime) method returning the
list of intervals of open time between mintime and maxtime.

Agenda gets a get_free_time(mintime, maxtime) method returning the
list of intervals of open time between mintime and maxtim.
2023-04-21 13:31:42 +02:00
Benjamin Dauvergne ae99d87e27 tests: add helper functions to manage meetings agendas (#76335) 2023-04-21 13:31:42 +02:00
Benjamin Dauvergne 33b4c807b4 utils: add IntervalSet.__add__ (#76335)
Most algo around agendas amounts to adding a bunch of intervals them
removing some. The method to add them was missing.
2023-04-21 13:30:54 +02:00
Benjamin Dauvergne 246e62d96b misc: move interval module in chrono.utils (#76335)
Just some cleaning.
2023-04-21 13:30:53 +02:00
Benjamin Dauvergne 60b1608f93 agendas: move get_all_slots() and get_min/max_datetime() as Agenda's methods (#76335) 2023-04-21 13:29:20 +02:00
8 changed files with 1571 additions and 328 deletions

View File

@ -0,0 +1,22 @@
# Generated by Django 3.2.18 on 2023-05-10 16:22
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('agendas', '0152_auto_20230331_0834'),
]
operations = [
migrations.AddField(
model_name='meetingtype',
name='date_end',
field=models.DateField(blank=True, null=True, verbose_name='End'),
),
migrations.AddField(
model_name='meetingtype',
name='date_start',
field=models.DateField(blank=True, null=True, verbose_name='Start'),
),
]

View File

@ -74,9 +74,9 @@ from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
from django.utils.translation import ngettext, pgettext_lazy
from chrono.interval import Interval, IntervalSet
from chrono.utils.date import get_weekday_index
from chrono.utils.db import ArraySubquery, SumCardinality
from chrono.utils.interval import Interval, IntervalSet
from chrono.utils.misc import AgendaImportError, ICSError, clean_import_data, generate_slug
from chrono.utils.publik_urls import translate_from_publik_url
from chrono.utils.requests_wrapper import requests as requests_wrapper
@ -164,6 +164,11 @@ def booking_template_validator(value):
pass
TimeSlot = collections.namedtuple(
'TimeSlot', ['start_datetime', 'end_datetime', 'full', 'desk', 'booked_for_external_user']
)
class Agenda(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
@ -1108,6 +1113,427 @@ class Agenda(models.Model):
return True
def get_min_datetime(self, start_datetime=None):
if self.minimal_booking_delay is None:
return start_datetime
if start_datetime is None:
return self.min_booking_datetime
return max(self.min_booking_datetime, start_datetime)
def get_max_datetime(self, end_datetime=None):
if self.maximal_booking_delay is None:
return end_datetime
if end_datetime is None:
return self.max_booking_datetime
return min(self.max_booking_datetime, end_datetime)
def get_all_slots(
self,
meeting_type,
resources=None,
unique=False,
start_datetime=None,
end_datetime=None,
user_external_id=None,
):
"""Get all occupation state of all possible slots for the given agenda (of
its real agendas for a virtual agenda) and the given meeting_type.
The process is done in four phases:
- first phase: aggregate time intervals, during which a meeting is impossible
due to TimePeriodException models, by desk in IntervalSet (compressed
and ordered list of intervals).
- second phase: aggregate time intervals by desk for already booked slots, again
to make IntervalSet,
- third phase: for a meetings agenda, if resources has to be booked,
aggregate time intervals for already booked resources, to make IntervalSet.
- fourth and last phase: generate time slots from each time period based
on the time period definition and on the desk's respective agenda real
min/max_datetime; for each time slot check its status in the exclusion
and bookings sets.
If it is excluded, ignore it completely.
It if is booked, report the slot as full.
"""
assert self.kind != 'events', 'get_all_slots() does not work on events agendas'
resources = resources or []
# virtual agendas have one constraint :
# all the real agendas MUST have the same meetingstypes, the consequence is
# that the base_meeting_duration for the virtual agenda is always the same
# as the base meeting duration of each real agenda.
base_meeting_duration = self.get_base_meeting_duration()
max_meeting_duration_td = datetime.timedelta(minutes=self.get_max_meeting_duration())
base_min_datetime = self.get_min_datetime(start_datetime)
base_max_datetime = self.get_max_datetime(end_datetime)
meeting_duration = meeting_type.duration
meeting_duration_td = datetime.timedelta(minutes=meeting_duration)
now_datetime = now()
base_date = now_datetime.date()
agendas = self.get_real_agendas()
# regroup agendas by their opening period
agenda_ids_by_min_max_datetimes = collections.defaultdict(set)
agenda_id_min_max_datetime = {}
for agenda in agendas:
used_min_datetime = base_min_datetime
if self.minimal_booking_delay is None:
used_min_datetime = agenda.get_min_datetime(start_datetime)
used_max_datetime = base_max_datetime
if self.maximal_booking_delay is None:
used_max_datetime = agenda.get_max_datetime(end_datetime)
agenda_ids_by_min_max_datetimes[(used_min_datetime, used_max_datetime)].add(agenda.id)
agenda_id_min_max_datetime[agenda.id] = (used_min_datetime, used_max_datetime)
# aggregate time period exceptions by desk as IntervalSet for fast querying
# 1. sort exceptions by start_datetime
# 2. group them by desk
# 3. convert each desk's list of exception to intervals then IntervalSet
desks_exceptions = {
time_period_desk: IntervalSet.from_ordered(
map(TimePeriodException.as_interval, time_period_exceptions)
)
for time_period_desk, time_period_exceptions in itertools.groupby(
TimePeriodException.objects.filter(desk__agenda__in=agendas)
.select_related('desk')
.order_by('desk_id', 'start_datetime', 'end_datetime'),
key=lambda time_period: time_period.desk,
)
}
# add exceptions from unavailability calendar
time_period_exception_queryset = (
TimePeriodException.objects.all()
.select_related('unavailability_calendar')
.prefetch_related(
Prefetch(
'unavailability_calendar__desks',
queryset=Desk.objects.filter(agenda__in=agendas),
to_attr='prefetched_desks',
)
)
.filter(unavailability_calendar__desks__agenda__in=agendas)
.order_by('start_datetime', 'end_datetime')
)
for time_period_exception in time_period_exception_queryset:
# unavailability calendar can be used in all desks;
# ignore desks outside of current agenda(s)
for desk in time_period_exception.unavailability_calendar.prefetched_desks:
if desk not in desks_exceptions:
desks_exceptions[desk] = IntervalSet()
desks_exceptions[desk].add(
time_period_exception.start_datetime, time_period_exception.end_datetime
)
# compute reduced min/max_datetime windows by desks based on exceptions
desk_min_max_datetime = {}
for desk, desk_exception in desks_exceptions.items():
base = IntervalSet([agenda_id_min_max_datetime[desk.agenda_id]])
base = base - desk_exception
if not base:
# ignore this desk, exceptions cover all opening time
# use an empty interval (begin == end) for this
desk_min_max_datetime[desk] = (now_datetime, now_datetime)
continue
min_datetime = base.min().replace(hour=0, minute=0, second=0, microsecond=0)
if base_min_datetime:
min_datetime = max(min_datetime, base_min_datetime)
max_datetime = base.max()
if base_max_datetime:
max_datetime = min(max_datetime, base_max_datetime)
desk_min_max_datetime[desk] = (min_datetime, max_datetime)
# aggregate already booked time intervals by desk
bookings = {}
for (used_min_datetime, used_max_datetime), agenda_ids in agenda_ids_by_min_max_datetimes.items():
booked_events = (
Event.objects.filter(
agenda__in=agenda_ids,
start_datetime__gte=used_min_datetime - max_meeting_duration_td,
start_datetime__lte=used_max_datetime,
)
.exclude(booking__cancellation_datetime__isnull=False)
# ordering is important for the later groupby, it works like sort | uniq
.order_by('desk_id', 'start_datetime', 'meeting_type__duration')
.values_list('desk_id', 'start_datetime', 'meeting_type__duration')
)
# compute exclusion set by desk from all bookings, using
# itertools.groupby() to group them by desk_id
bookings.update(
(
desk_id,
IntervalSet.from_ordered(
(
event_start_datetime,
event_start_datetime + datetime.timedelta(minutes=event_duration),
)
for desk_id, event_start_datetime, event_duration in values
),
)
for desk_id, values in itertools.groupby(booked_events, lambda be: be[0])
)
# aggregate already booked time intervals for resources
resources_bookings = IntervalSet()
if self.kind == 'meetings' and resources:
used_min_datetime, used_max_datetime = agenda_id_min_max_datetime[self.pk]
event_ids_queryset = Event.resources.through.objects.filter(
resource__in=[r.pk for r in resources]
).values('event')
booked_events = (
Event.objects.filter(
pk__in=event_ids_queryset,
start_datetime__gte=used_min_datetime - max_meeting_duration_td,
start_datetime__lte=used_max_datetime,
)
.exclude(booking__cancellation_datetime__isnull=False)
.order_by('start_datetime', 'meeting_type__duration')
.values_list('start_datetime', 'meeting_type__duration')
)
# compute exclusion set
resources_bookings = IntervalSet.from_ordered(
(event_start_datetime, event_start_datetime + datetime.timedelta(minutes=event_duration))
for event_start_datetime, event_duration in booked_events
)
# aggregate already booked time intervals by excluded_user_external_id
user_bookings = IntervalSet()
if user_external_id:
used_min_datetime, used_max_datetime = (
min(v[0] for v in agenda_id_min_max_datetime.values()),
max(v[1] for v in agenda_id_min_max_datetime.values()),
)
booked_events = (
Event.objects.filter(
agenda__in=agendas,
start_datetime__gte=used_min_datetime - max_meeting_duration_td,
start_datetime__lte=used_max_datetime,
booking__user_external_id=user_external_id,
)
.exclude(booking__cancellation_datetime__isnull=False)
# ordering is important for the later groupby, it works like sort | uniq
.order_by('start_datetime', 'meeting_type__duration')
.values_list('start_datetime', 'meeting_type__duration')
)
# compute exclusion set by desk from all bookings, using
# itertools.groupby() to group them by desk_id
user_bookings = IntervalSet.from_ordered(
(
event_start_datetime,
event_start_datetime + datetime.timedelta(minutes=event_duration),
)
for event_start_datetime, event_duration in booked_events
)
unique_booked = {}
for time_period in self.get_effective_time_periods(base_min_datetime, base_max_datetime):
duration = (
datetime.datetime.combine(base_date, time_period.end_time)
- datetime.datetime.combine(base_date, time_period.start_time)
).seconds / 60
if duration < meeting_type.duration:
# skip time period that can't even hold a single meeting
continue
desks_by_min_max_datetime = collections.defaultdict(list)
for desk in time_period.desks:
min_max = desk_min_max_datetime.get(desk, agenda_id_min_max_datetime[desk.agenda_id])
desks_by_min_max_datetime[min_max].append(desk)
# aggregate agendas based on their real min/max_datetime :
# the get_time_slots() result is dependant upon these values, so even
# if we deduplicated a TimePeriod for some desks, if their respective
# agendas have different real min/max_datetime we must unduplicate them
# at time slot generation phase.
for (used_min_datetime, used_max_datetime), desks in desks_by_min_max_datetime.items():
for start_datetime in time_period.get_time_slots(
min_datetime=used_min_datetime,
max_datetime=used_max_datetime,
meeting_duration=meeting_duration,
base_duration=base_meeting_duration,
):
end_datetime = start_datetime + meeting_duration_td
timestamp = start_datetime.timestamp()
# skip generating datetimes if we already know that this
# datetime is available
if unique and unique_booked.get(timestamp) is False:
continue
for desk in sorted(desks, key=lambda desk: desk.label):
# ignore the slot for this desk, if it overlaps and exclusion period for this desk
excluded = desk in desks_exceptions and desks_exceptions[desk].overlaps(
start_datetime, end_datetime
)
if excluded:
continue
# slot is full if an already booked event overlaps it
# check resources first
booked = resources_bookings.overlaps(start_datetime, end_datetime)
# then check user boookings
booked_for_external_user = user_bookings.overlaps(start_datetime, end_datetime)
booked = booked or booked_for_external_user
# then bookings if resources are free
if not booked:
booked = desk.id in bookings and bookings[desk.id].overlaps(
start_datetime, end_datetime
)
if unique and unique_booked.get(timestamp) is booked:
continue
unique_booked[timestamp] = booked
yield TimeSlot(
start_datetime=start_datetime,
end_datetime=end_datetime,
desk=desk,
full=booked,
booked_for_external_user=booked_for_external_user,
)
if unique and not booked:
break
def get_free_time(
self,
start_datetime=None,
end_datetime=None,
):
"""Get open time on this agenda in the future.
The process is done in three phases:
1. aggregate exceptions by desk,
2. aggregate booked events by desks,
3. for each desk compute the normal opening time based on timeperiods,
then remove expceptions and booked events, and aggregate all that as
the result.
"""
assert self.kind != 'events', 'get_all_slots() does not work on events agendas'
max_meeting_duration_td = datetime.timedelta(minutes=self.get_max_meeting_duration())
base_min_datetime = self.get_min_datetime(start_datetime) or now()
base_max_datetime = self.get_max_datetime(end_datetime)
agendas = self.get_real_agendas()
# regroup agendas by their opening period
agenda_ids_by_min_max_datetimes = collections.defaultdict(set)
agenda_id_min_max_datetime = {}
for agenda in agendas:
used_min_datetime = base_min_datetime
if self.minimal_booking_delay is None:
used_min_datetime = agenda.get_min_datetime(start_datetime)
used_max_datetime = base_max_datetime
if self.maximal_booking_delay is None:
used_max_datetime = agenda.get_max_datetime(end_datetime)
agenda_ids_by_min_max_datetimes[(used_min_datetime, used_max_datetime)].add(agenda.id)
agenda_id_min_max_datetime[agenda.id] = (used_min_datetime, used_max_datetime)
# aggregate time period exceptions by desk as IntervalSet for fast querying
# 1. sort exceptions by start_datetime
# 2. group them by desk
# 3. convert each desk's list of exception to intervals then IntervalSet
desks_exceptions = {
time_period_desk.id: IntervalSet.from_ordered(
map(TimePeriodException.as_interval, time_period_exceptions)
)
for time_period_desk, time_period_exceptions in itertools.groupby(
TimePeriodException.objects.filter(desk__agenda__in=agendas)
.select_related('desk')
.order_by('desk_id', 'start_datetime', 'end_datetime'),
key=lambda time_period: time_period.desk,
)
}
# add exceptions from unavailability calendar
time_period_exception_queryset = (
TimePeriodException.objects.all()
.select_related('unavailability_calendar')
.prefetch_related(
Prefetch(
'unavailability_calendar__desks',
queryset=Desk.objects.filter(agenda__in=agendas),
to_attr='prefetched_desks',
)
)
.filter(unavailability_calendar__desks__agenda__in=agendas)
.order_by('start_datetime', 'end_datetime')
)
for time_period_exception in time_period_exception_queryset:
# unavailability calendar can be used in all desks;
# ignore desks outside of current agenda(s)
for desk in time_period_exception.unavailability_calendar.prefetched_desks:
if desk.id not in desks_exceptions:
desks_exceptions[desk.id] = IntervalSet()
desks_exceptions[desk.id].add(
time_period_exception.start_datetime, time_period_exception.end_datetime
)
# aggregate already booked time intervals by desk
bookings = {}
for (used_min_datetime, used_max_datetime), agenda_ids in agenda_ids_by_min_max_datetimes.items():
booked_events = (
Event.objects.filter(
agenda__in=agenda_ids,
start_datetime__gte=used_min_datetime - max_meeting_duration_td,
start_datetime__lte=used_max_datetime,
)
.exclude(booking__cancellation_datetime__isnull=False)
# ordering is important for the later groupby, it works like sort | uniq
.order_by('desk_id', 'start_datetime', 'meeting_type__duration')
.values_list('desk_id', 'start_datetime', 'meeting_type__duration')
)
# compute exclusion set by desk from all bookings, using
# itertools.groupby() to group them by desk_id
bookings.update(
(
desk_id,
IntervalSet.from_ordered(
(
event_start_datetime,
event_start_datetime + datetime.timedelta(minutes=event_duration),
)
for desk_id, event_start_datetime, event_duration in values
),
)
for desk_id, values in itertools.groupby(booked_events, lambda be: be[0])
)
# aggregate open times by desks
free_time_by_desk = {}
for time_period in self.get_effective_time_periods(base_min_datetime, base_max_datetime):
base_interval_set = IntervalSet.from_ordered(
time_period.get_intervals(base_min_datetime, base_max_datetime)
)
for desk in time_period.desks:
if desk in free_time_by_desk:
free_time_by_desk[desk] += base_interval_set
else:
free_time_by_desk[desk] = base_interval_set
# reduce desks' open time by agenda effective min/max datetime
free_time = []
for desk, value in free_time_by_desk.items():
min_max = agenda_id_min_max_datetime[desk.agenda_id]
desk_free_time = value
if not desk_free_time:
continue
if desk_free_time.min() < min_max[0]:
desk_free_time -= IntervalSet([(free_time.min(), min_max[0])])
if min_max[1] < desk_free_time.max():
desk_free_time -= IntervalSet([(min_max[1], free_time.max())])
if desk.id in desks_exceptions:
desk_free_time -= desks_exceptions[desk.id]
if desk.id in bookings:
desk_free_time -= bookings[desk.id]
if free_time is None:
free_time = desk_free_time
else:
free_time += desk_free_time
return free_time
class VirtualMember(models.Model):
"""Trough model to link virtual agendas to their real agendas.
@ -1309,9 +1735,9 @@ class SharedTimePeriod:
self.date = date
self.desks = set(desks)
def __str__(self):
def __repr__(self):
return '%s / %s%s' % (
force_str(WEEKDAYS[self.weekday]),
WEEKDAYS[self.weekday],
date_format(self.start_time, 'TIME_FORMAT'),
date_format(self.end_time, 'TIME_FORMAT'),
)
@ -1428,12 +1854,40 @@ class SharedTimePeriod:
desks=desks,
)
def get_intervals(self, min_datetime, max_datetime):
"""Generate all possible intervals of time between min_datetime and
max_datetime, corresponding to the this timeperiod.
"""
min_datetime = localtime(min_datetime)
max_datetime = localtime(max_datetime)
if self.date:
# if self.date if out of the current period, returns early
if not (min_datetime.date() <= self.date <= max_datetime.date()):
return
start_datetime = make_aware(datetime.datetime.combine(self.date, self.start_time))
else:
start_datetime = make_aware(datetime.datetime.combine(min_datetime.date(), self.start_time))
start_datetime += datetime.timedelta(days=self.weekday - min_datetime.weekday())
if start_datetime < min_datetime:
start_datetime += datetime.timedelta(days=7)
while start_datetime < max_datetime:
if not self.weekday_indexes or get_weekday_index(start_datetime) in self.weekday_indexes:
end_datetime = make_aware(datetime.datetime.combine(start_datetime.date(), self.end_time))
yield (max(start_datetime, min_datetime), min(end_datetime, max_datetime))
if self.date:
break
start_datetime += datetime.timedelta(days=7)
class MeetingType(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
duration = models.IntegerField(_('Duration (in minutes)'), default=30, validators=[MinValueValidator(1)])
date_start = models.DateField(_('Start'), null=True, blank=True)
date_end = models.DateField(_('End'), null=True, blank=True)
deleted = models.BooleanField(_('Deleted'), default=False)
class Meta:

View File

@ -17,7 +17,6 @@
import collections
import copy
import datetime
import itertools
import json
import uuid
@ -51,11 +50,9 @@ from chrono.agendas.models import (
MeetingType,
SharedCustodyAgenda,
Subscription,
TimePeriodException,
)
from chrono.api import serializers
from chrono.api.utils import APIError, APIErrorBadRequest, Response
from chrono.interval import IntervalSet
from chrono.utils.publik_urls import translate_to_publik_url
from chrono.utils.timezone import localtime, make_aware, now
@ -68,291 +65,6 @@ def format_response_date(dt):
return localtime(dt).strftime('%Y-%m-%d')
def get_min_datetime(agenda, start_datetime=None):
if agenda.minimal_booking_delay is None:
return start_datetime
if start_datetime is None:
return agenda.min_booking_datetime
return max(agenda.min_booking_datetime, start_datetime)
def get_max_datetime(agenda, end_datetime=None):
if agenda.maximal_booking_delay is None:
return end_datetime
if end_datetime is None:
return agenda.max_booking_datetime
return min(agenda.max_booking_datetime, end_datetime)
TimeSlot = collections.namedtuple(
'TimeSlot', ['start_datetime', 'end_datetime', 'full', 'desk', 'booked_for_external_user']
)
def get_all_slots(
base_agenda,
meeting_type,
resources=None,
unique=False,
start_datetime=None,
end_datetime=None,
user_external_id=None,
):
"""Get all occupation state of all possible slots for the given agenda (of
its real agendas for a virtual agenda) and the given meeting_type.
The process is done in four phases:
- first phase: aggregate time intervals, during which a meeting is impossible
due to TimePeriodException models, by desk in IntervalSet (compressed
and ordered list of intervals).
- second phase: aggregate time intervals by desk for already booked slots, again
to make IntervalSet,
- third phase: for a meetings agenda, if resources has to be booked,
aggregate time intervals for already booked resources, to make IntervalSet.
- fourth and last phase: generate time slots from each time period based
on the time period definition and on the desk's respective agenda real
min/max_datetime; for each time slot check its status in the exclusion
and bookings sets.
If it is excluded, ignore it completely.
It if is booked, report the slot as full.
"""
resources = resources or []
# virtual agendas have one constraint :
# all the real agendas MUST have the same meetingstypes, the consequence is
# that the base_meeting_duration for the virtual agenda is always the same
# as the base meeting duration of each real agenda.
base_meeting_duration = base_agenda.get_base_meeting_duration()
max_meeting_duration_td = datetime.timedelta(minutes=base_agenda.get_max_meeting_duration())
base_min_datetime = get_min_datetime(base_agenda, start_datetime)
base_max_datetime = get_max_datetime(base_agenda, end_datetime)
meeting_duration = meeting_type.duration
meeting_duration_td = datetime.timedelta(minutes=meeting_duration)
now_datetime = now()
base_date = now_datetime.date()
agendas = base_agenda.get_real_agendas()
# regroup agendas by their opening period
agenda_ids_by_min_max_datetimes = collections.defaultdict(set)
agenda_id_min_max_datetime = {}
for agenda in agendas:
used_min_datetime = base_min_datetime
if base_agenda.minimal_booking_delay is None:
used_min_datetime = get_min_datetime(agenda, start_datetime)
used_max_datetime = base_max_datetime
if base_agenda.maximal_booking_delay is None:
used_max_datetime = get_max_datetime(agenda, end_datetime)
agenda_ids_by_min_max_datetimes[(used_min_datetime, used_max_datetime)].add(agenda.id)
agenda_id_min_max_datetime[agenda.id] = (used_min_datetime, used_max_datetime)
# aggregate time period exceptions by desk as IntervalSet for fast querying
# 1. sort exceptions by start_datetime
# 2. group them by desk
# 3. convert each desk's list of exception to intervals then IntervalSet
desks_exceptions = {
time_period_desk: IntervalSet.from_ordered(
map(TimePeriodException.as_interval, time_period_exceptions)
)
for time_period_desk, time_period_exceptions in itertools.groupby(
TimePeriodException.objects.filter(desk__agenda__in=agendas)
.select_related('desk')
.order_by('desk_id', 'start_datetime', 'end_datetime'),
key=lambda time_period: time_period.desk,
)
}
# add exceptions from unavailability calendar
time_period_exception_queryset = (
TimePeriodException.objects.all()
.select_related('unavailability_calendar')
.prefetch_related(
Prefetch(
'unavailability_calendar__desks',
queryset=Desk.objects.filter(agenda__in=agendas),
to_attr='prefetched_desks',
)
)
.filter(unavailability_calendar__desks__agenda__in=agendas)
.order_by('start_datetime', 'end_datetime')
)
for time_period_exception in time_period_exception_queryset:
# unavailability calendar can be used in all desks;
# ignore desks outside of current agenda(s)
for desk in time_period_exception.unavailability_calendar.prefetched_desks:
if desk not in desks_exceptions:
desks_exceptions[desk] = IntervalSet()
desks_exceptions[desk].add(
time_period_exception.start_datetime, time_period_exception.end_datetime
)
# compute reduced min/max_datetime windows by desks based on exceptions
desk_min_max_datetime = {}
for desk, desk_exception in desks_exceptions.items():
base = IntervalSet([agenda_id_min_max_datetime[desk.agenda_id]])
base = base - desk_exception
if not base:
# ignore this desk, exceptions cover all opening time
# use an empty interval (begin == end) for this
desk_min_max_datetime[desk] = (now_datetime, now_datetime)
continue
min_datetime = base.min().replace(hour=0, minute=0, second=0, microsecond=0)
if base_min_datetime:
min_datetime = max(min_datetime, base_min_datetime)
max_datetime = base.max()
if base_max_datetime:
max_datetime = min(max_datetime, base_max_datetime)
desk_min_max_datetime[desk] = (min_datetime, max_datetime)
# aggregate already booked time intervals by desk
bookings = {}
for (used_min_datetime, used_max_datetime), agenda_ids in agenda_ids_by_min_max_datetimes.items():
booked_events = (
Event.objects.filter(
agenda__in=agenda_ids,
start_datetime__gte=used_min_datetime - max_meeting_duration_td,
start_datetime__lte=used_max_datetime,
)
.exclude(booking__cancellation_datetime__isnull=False)
# ordering is important for the later groupby, it works like sort | uniq
.order_by('desk_id', 'start_datetime', 'meeting_type__duration')
.values_list('desk_id', 'start_datetime', 'meeting_type__duration')
)
# compute exclusion set by desk from all bookings, using
# itertools.groupby() to group them by desk_id
bookings.update(
(
desk_id,
IntervalSet.from_ordered(
(event_start_datetime, event_start_datetime + datetime.timedelta(minutes=event_duration))
for desk_id, event_start_datetime, event_duration in values
),
)
for desk_id, values in itertools.groupby(booked_events, lambda be: be[0])
)
# aggregate already booked time intervals for resources
resources_bookings = IntervalSet()
if base_agenda.kind == 'meetings' and resources:
used_min_datetime, used_max_datetime = agenda_id_min_max_datetime[base_agenda.pk]
event_ids_queryset = Event.resources.through.objects.filter(
resource__in=[r.pk for r in resources]
).values('event')
booked_events = (
Event.objects.filter(
pk__in=event_ids_queryset,
start_datetime__gte=used_min_datetime - max_meeting_duration_td,
start_datetime__lte=used_max_datetime,
)
.exclude(booking__cancellation_datetime__isnull=False)
.order_by('start_datetime', 'meeting_type__duration')
.values_list('start_datetime', 'meeting_type__duration')
)
# compute exclusion set
resources_bookings = IntervalSet.from_ordered(
(event_start_datetime, event_start_datetime + datetime.timedelta(minutes=event_duration))
for event_start_datetime, event_duration in booked_events
)
# aggregate already booked time intervals by excluded_user_external_id
user_bookings = IntervalSet()
if user_external_id:
used_min_datetime, used_max_datetime = (
min(v[0] for v in agenda_id_min_max_datetime.values()),
max(v[1] for v in agenda_id_min_max_datetime.values()),
)
booked_events = (
Event.objects.filter(
agenda__in=agendas,
start_datetime__gte=used_min_datetime - max_meeting_duration_td,
start_datetime__lte=used_max_datetime,
booking__user_external_id=user_external_id,
)
.exclude(booking__cancellation_datetime__isnull=False)
# ordering is important for the later groupby, it works like sort | uniq
.order_by('start_datetime', 'meeting_type__duration')
.values_list('start_datetime', 'meeting_type__duration')
)
# compute exclusion set by desk from all bookings, using
# itertools.groupby() to group them by desk_id
user_bookings = IntervalSet.from_ordered(
(
event_start_datetime,
event_start_datetime + datetime.timedelta(minutes=event_duration),
)
for event_start_datetime, event_duration in booked_events
)
unique_booked = {}
for time_period in base_agenda.get_effective_time_periods(base_min_datetime, base_max_datetime):
duration = (
datetime.datetime.combine(base_date, time_period.end_time)
- datetime.datetime.combine(base_date, time_period.start_time)
).seconds / 60
if duration < meeting_type.duration:
# skip time period that can't even hold a single meeting
continue
desks_by_min_max_datetime = collections.defaultdict(list)
for desk in time_period.desks:
min_max = desk_min_max_datetime.get(desk, agenda_id_min_max_datetime[desk.agenda_id])
desks_by_min_max_datetime[min_max].append(desk)
# aggregate agendas based on their real min/max_datetime :
# the get_time_slots() result is dependant upon these values, so even
# if we deduplicated a TimePeriod for some desks, if their respective
# agendas have different real min/max_datetime we must unduplicate them
# at time slot generation phase.
for (used_min_datetime, used_max_datetime), desks in desks_by_min_max_datetime.items():
for start_datetime in time_period.get_time_slots(
min_datetime=used_min_datetime,
max_datetime=used_max_datetime,
meeting_duration=meeting_duration,
base_duration=base_meeting_duration,
):
end_datetime = start_datetime + meeting_duration_td
timestamp = start_datetime.timestamp()
# skip generating datetimes if we already know that this
# datetime is available
if unique and unique_booked.get(timestamp) is False:
continue
for desk in sorted(desks, key=lambda desk: desk.label):
# ignore the slot for this desk, if it overlaps and exclusion period for this desk
excluded = desk in desks_exceptions and desks_exceptions[desk].overlaps(
start_datetime, end_datetime
)
if excluded:
continue
# slot is full if an already booked event overlaps it
# check resources first
booked = resources_bookings.overlaps(start_datetime, end_datetime)
# then check user boookings
booked_for_external_user = user_bookings.overlaps(start_datetime, end_datetime)
booked = booked or booked_for_external_user
# then bookings if resources are free
if not booked:
booked = desk.id in bookings and bookings[desk.id].overlaps(
start_datetime, end_datetime
)
if unique and unique_booked.get(timestamp) is booked:
continue
unique_booked[timestamp] = booked
yield TimeSlot(
start_datetime=start_datetime,
end_datetime=end_datetime,
desk=desk,
full=booked,
booked_for_external_user=booked_for_external_user,
)
if unique and not booked:
break
def get_agenda_detail(request, agenda, check_events=False):
agenda_detail = {
'id': agenda.slug,
@ -1075,8 +787,7 @@ class MeetingDatetimes(APIView):
def unique_slots():
last_slot = None
all_slots = list(
get_all_slots(
agenda,
agenda.get_all_slots(
meeting_type,
resources=resources,
unique=True,
@ -1498,8 +1209,7 @@ class Fillslots(APIView):
except (MeetingType.DoesNotExist, ValueError):
raise APIErrorBadRequest(N_('invalid meeting type id: %s'), meeting_type_id)
all_slots = sorted(
get_all_slots(
agenda,
agenda.get_all_slots(
meeting_type,
resources=resources,
user_external_id=user_external_id if exclude_user else None,

View File

@ -15,9 +15,35 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import bisect
import collections
import typing
Interval = collections.namedtuple('Interval', ['begin', 'end'])
class Interval(typing.NamedTuple):
begin: typing.Any
end: typing.Any
def disjoint(self, other):
return self < other or self > other
def overlaps(self, other):
return not self.disjoint(other)
def __lt__(self, other):
return self[1] < other[0]
def __gt__(self, other):
return other[1] < self[0]
def union(self, other):
other = self.cast(other)
assert self.overlaps(other)
return Interval(min(self.begin, other.begin), max(self.end, other.end))
@classmethod
def cast(cls, other):
if isinstance(other, cls):
return other
return cls(*other)
class IntervalSet:
@ -159,6 +185,9 @@ class IntervalSet:
return value
return cls(value)
def __rsub__(self, other):
return self.cast(other) - self
def __sub__(self, other):
l1 = iter(self)
l2 = iter(self.cast(other))
@ -200,6 +229,95 @@ class IntervalSet:
return self.__class__.from_ordered(gen())
def __radd__(self, other):
return self.cast(other).__add__(self)
def __add__(self, other):
l1 = iter(self)
l2 = iter(self.cast(other))
def gen():
state = 3
current = None
while True:
if state & 1:
c1 = next(l1, None)
if state & 2:
c2 = next(l2, None)
if current:
if not c1 and not c2:
yield current
break
if not c1:
if current < c2:
yield current
yield c2
break
if c2 < current:
yield c2
yield current
else:
yield current.union(c2)
break
if not c2:
if current < c1:
yield current
yield c1
break
if c1 < current:
yield c1
yield current
else:
yield current.union(c1)
break
if current < c1 and current < c2:
yield current
current = None
elif current.overlaps(c1) and current.overlaps(c2):
yield current.union(c1).union(c2)
current = None
state = 3
continue
elif current < c2:
yield current.union(c1)
current = None
state = 1
continue
else:
yield current.union(c2)
current = None
state = 2
continue
if not c1 and not c2:
# l1 and l2 are empty, stop
break
if not c1:
# l1 is empty, yield c2 and stop
yield c2
break
if not c2:
# l2 is empty, yield c1 and stop
yield c1
break
if c1 < c2:
# l1 is before l2, yield c1 and advance l1 only
yield c1
state = 1
continue
if c2 < c1:
# l2 is before l1, yield c2 and advance l2 only
yield c2
state = 2
continue
current = c1.union(c2)
state = 3
# finish by yielding from the not empty ones
yield from l1
yield from l2
return self.__class__.from_ordered(gen())
def min(self):
if self:
return self.begin[0]

View File

@ -36,8 +36,11 @@ from chrono.agendas.models import (
UnavailabilityCalendar,
VirtualMember,
)
from chrono.utils.interval import IntervalSet
from chrono.utils.timezone import localtime, make_aware, make_naive, now
from .utils import add_day_timeperiod, add_exception, add_meeting, build_agendas, paris, utc
pytestmark = pytest.mark.django_db
ICS_SAMPLE = """BEGIN:VCALENDAR
@ -116,7 +119,6 @@ END:VCALENDAR"""
INVALID_ICS_SAMPLE = """content
"""
with open('tests/data/atreal.ics') as f:
ICS_ATREAL = f.read()
@ -209,7 +211,7 @@ def test_agenda_minimal_booking_delay(freezer):
def test_agenda_minimal_booking_delay_in_working_days(settings, freezer):
freezer.move_to('2021-07-09')
agenda = Agenda.objects.create(label='Agenda', minimal_booking_delay=1)
agenda = build_agendas('meetings Agenda minimal_booking_delay=1')
settings.WORKING_DAY_CALENDAR = None
agenda.minimal_booking_delay_in_working_days = True
@ -298,7 +300,7 @@ def delay_parameter_to_label(argvalue):
)
def test_agenda_minimal_booking_delay_no_minimal_booking_time(freezer, current_time, min_booking_datetime):
freezer.move_to(current_time)
agenda = Agenda.objects.create(label='Agenda', minimal_booking_delay=4, minimal_booking_time=None)
agenda = build_agendas('meetings Agenda minimal_booking_delay=4 minimal_booking_time=None')
assert make_naive(agenda.min_booking_datetime) == min_booking_datetime
@ -694,13 +696,14 @@ def test_meeting_type_slugs():
def test_timeperiodexception_creation_from_ics():
agenda = Agenda.objects.create(label='Test 1 agenda')
desk = Desk.objects.create(label='Test 1 desk', agenda=agenda)
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE, name='sample.ics')
agenda = build_agendas(
'''\
meetings "Test 1 agenda"
desk "Test 1 desk"
exception-source sample.ics ICS_SAMPLE'''
)
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 2
agenda._test_1_desk._sample_ics.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=agenda._test_1_desk).count() == 2
def test_timeperiodexception_creation_from_ics_without_startdt():
@ -1525,8 +1528,13 @@ def test_desk_duplicate_exception_sources():
}
)
def test_desk_duplicate_exception_source_from_settings():
agenda = Agenda.objects.create(label='Agenda')
desk = Desk.objects.create(label='Desk', agenda=agenda)
agenda = build_agendas(
'''\
meetings Agenda
desk Desk
'''
)
desk = agenda._desk
desk.import_timeperiod_exceptions_from_settings(enable=True)
source = desk.timeperiodexceptionsource_set.get(settings_slug='holidays')
@ -2254,30 +2262,25 @@ def test_agenda_reminders_templated_content(mailoutbox, freezer):
@override_settings(TIME_ZONE='UTC')
def test_agenda_reminders_meetings(mailoutbox, freezer):
freezer.move_to('2020-01-01 11:00')
agenda = Agenda.objects.create(label='Events', kind='meetings')
desk = Desk.objects.create(agenda=agenda, label='Desk')
meetingtype = MeetingType.objects.create(agenda=agenda, label='Bar', duration=30)
TimePeriod.objects.create(
desk=desk, weekday=now().weekday(), start_time=datetime.time(10, 0), end_time=datetime.time(18, 0)
freezer.move_to(utc('2020-01-01 11:00'))
agenda = build_agendas(
'''\
meetings Events
desk Desk
timeperiod wednesday 10:00-18:00
meeting-type Bar 30
reminder-setting days_before_email=2
'''
)
AgendaReminderSettings.objects.create(agenda=agenda, days_before_email=2)
event = Event.objects.create(
agenda=agenda,
places=1,
desk=desk,
meeting_type=meetingtype,
start_datetime=now() + datetime.timedelta(days=5), # 06/01
)
Booking.objects.create(
event=event,
add_meeting(
agenda,
start_datetime=utc('2020-01-06T11:00:00'),
user_email='t@test.org',
user_display_label='Birth certificate',
form_url='publik://default/someform/1/',
)
freezer.move_to('2020-01-04 15:00')
freezer.move_to(utc('2020-01-04 15:00'))
call_command('send_booking_reminders')
assert len(mailoutbox) == 1
@ -3635,3 +3638,145 @@ def test_shared_custody_agenda_unique_child_no_date_end():
SharedCustodyAgenda.objects.create(
first_guardian=father, second_guardian=mother, child=child, date_start=datetime.date(2023, 1, 1)
)
def test_meetings_agenda_get_free_time(db, freezer):
freezer.move_to(paris('2023-04-02 19:10'))
agenda = build_agendas(
'''
meetings Agenda 30
desk "Desk 1" monday-friday 08:00-12:00 14:00-17:00
unavailability-calendar Congés
exception Avril paris('2023-04-05 00:00:00 24h')
desk "Desk 2" monday-friday 09:00-12:00
unavailability-calendar Congés
desk "Desk 3" monday-friday 15:00-17:00
unavailability-calendar Congés
'''
)
full_free_time = IntervalSet(
[
paris('2023-04-03 08:00 4h'),
paris('2023-04-03 14:00 3h'),
paris('2023-04-04 08:00 4h'),
paris('2023-04-04 14:00 3h'),
paris('2023-04-06 08:00 4h'),
paris('2023-04-06 14:00 3h'),
paris('2023-04-07 08:00 4h'),
paris('2023-04-07 14:00 3h'),
paris('2023-04-08 08:00 2h'),
]
)
def closed():
return full_free_time - agenda.get_free_time(end_datetime=paris('2023-04-09 19:10'))
add_day_timeperiod(agenda._desk_1, paris('2023-04-08 08:00 2h'))
assert agenda.get_free_time(end_datetime=paris('2023-04-09 19:10')) == full_free_time
assert full_free_time - agenda.get_free_time(end_datetime=paris('2023-04-09 19:10')) == []
assert closed() == []
assert (
agenda.get_free_time(start_datetime=paris('2023-04-03 13:00'), end_datetime=paris('2023-04-09 19:10'))
== list(full_free_time)[1:]
)
# desk1 is closed on monday 3th april form 8' to 12' but
# desk2 is still open from 9' to 12', so free time is only
# diminished of the 8' to 9' hour.
add_exception(agenda._desk_1, paris('2023-04-03 08:00 4h'))
assert closed() == [paris('2023-04-03 08:00 1h')]
# now close desk2 from 11' to 12' on monday 3th
add_exception(agenda._desk_2, paris('2023-04-03 11:00 1h'))
assert closed() == [paris('2023-04-03 08:00 1h'), paris('2023-04-03 11:00 1h')]
# add a meeting on tuesday 4th april on desk1 at 14'30
add_meeting(agenda._desk_1, paris('2023-04-04 14:30'))
# an event on 5th at 15' but slot should still be open on desk3
add_meeting(agenda._desk_1, paris('2023-04-05 15:00'))
# two events on 6th at 15', slot is closed
add_meeting(agenda._desk_1, paris('2023-04-06 15:00'))
add_meeting(agenda._desk_3, paris('2023-04-06 15:00'))
assert closed() == [
paris('2023-04-03 08:00 1h'),
paris('2023-04-03 11:00 1h'),
paris('2023-04-04 14:30 30m'),
paris('2023-04-06 15:00 30m'),
]
def test_virtual_agenda_get_free_time(db, freezer):
freezer.move_to(paris('2023-04-02 19:10'))
# context:
# * a virtual agenda, containing:
# * three real agendas, with timperiods of monday to friday and a 30 minutes meeting type
# - agenda 1. 8-12h/14-17h
# - agenda 2. 9-12h
# - agenda 3. 15-17h
# * looking at week from monday 3th april 2023 to friday 7th april 2023
agenda = build_agendas(
'''
virtual Agenda 30
meetings Agenda-1
desk Desk-1 monday-friday 08:00-12:00 14:00-17:00
unavailability-calendar Congés
exception Avril paris('2023-04-05 00:00:00 24h')
meetings Agenda-2
desk Desk-1 monday-friday 09:00-12:00
unavailability-calendar Congés
meetings Agenda-3
desk Desk-1 monday-friday 15:00-17:00
unavailability-calendar Congés
'''
)
full_free_time = IntervalSet(
[
paris('2023-04-03 08:00 4h'),
paris('2023-04-03 14:00 3h'),
paris('2023-04-04 08:00 4h'),
paris('2023-04-04 14:00 3h'),
paris('2023-04-06 08:00 4h'),
paris('2023-04-06 14:00 3h'),
paris('2023-04-07 08:00 4h'),
paris('2023-04-07 14:00 3h'),
paris('2023-04-08 08:00 2h'),
]
)
def closed():
return full_free_time - agenda.get_free_time(end_datetime=paris('2023-04-09 19:10'))
add_day_timeperiod(agenda._agenda_1, paris('2023-04-08 08:00 2h'))
assert agenda.get_free_time(end_datetime=paris('2023-04-09 19:10')) == full_free_time
assert (
agenda.get_free_time(start_datetime=paris('2023-04-03 13:00'), end_datetime=paris('2023-04-09 19:10'))
== list(full_free_time)[1:]
)
# agenda1.desk is closed on monday 3th april form 8' to 12' but
# agenda2.desk is still open from 9' to 12', so free time is only
# diminished of the 8' to 9' hour.
add_exception(agenda._agenda_1, paris('2023-04-03 08:00 4h'))
assert closed() == [paris('2023-04-03 08:00 1h')]
# now close agenda2.desk from 11' to 12' on monday 3th
add_exception(agenda._agenda_2, paris('2023-04-03 11:00 1h'))
assert closed() == [paris('2023-04-03 08:00 1h'), paris('2023-04-03 11:00 1h')]
# add a meeting on tuesday 4th april on agenda1.desk at 14'30
add_meeting(agenda._agenda_1, paris('2023-04-04 14:30'))
# an event on 5th at 15' but slot should still be open on agenda3
add_meeting(agenda._agenda_1, paris('2023-04-05 15:00'))
# two events on 6th at 15', slot is closed
add_meeting(agenda._agenda_1, paris('2023-04-06 15:00'))
add_meeting(agenda._agenda_3, paris('2023-04-06 15:00'))
assert closed() == [
paris('2023-04-03 08:00 1h'),
paris('2023-04-03 11:00 1h'),
paris('2023-04-04 14:30 30m'),
paris('2023-04-06 15:00 30m'),
]

View File

@ -1,6 +1,11 @@
import pytest
from chrono.interval import Interval, IntervalSet
from chrono.utils.interval import Interval, IntervalSet
def test_interval_union():
assert Interval(1, 2).union((2, 3)) == (1, 3)
assert Interval(1, 2).union((2, 3)) == Interval(1, 3)
def test_interval_set_merge_adjacent():
@ -67,6 +72,8 @@ def test_interval_set_sub():
assert (s - []) == s
assert (IntervalSet([(0, 2)]) - [(1, 2)]) == [(0, 1)]
assert ([] - s) == []
def test_interval_set_min_max():
assert IntervalSet().min() is None
@ -86,3 +93,25 @@ def test_interval_set_eq():
assert not IntervalSet([(1, 2)]) == None # noqa pylint: disable=singleton-comparison
# noqa pylint: disable=singleton-comparison
assert not None == IntervalSet([(1, 2)])
def test_interval_set_add():
s = IntervalSet([(0, 3), (4, 7), (8, 11), (12, 15)])
t = IntervalSet([(3, 4), (7, 8), (11, 12)])
assert s + t == [(0, 15)]
assert t + s == [(0, 15)]
assert [(3, 4), (7, 8), (11, 12)] + s == [(0, 15)]
t = IntervalSet([(3, 4), (11, 12)])
assert s + t == [(0, 7), (8, 15)]
assert t + s == [(0, 7), (8, 15)]
t = IntervalSet([(2, 5), (10, 13)])
assert s + t == [(0, 7), (8, 15)]
assert t + s == [(0, 7), (8, 15)]
assert s + [] == s
assert [] + s == s
assert IntervalSet([(1, 3), (4, 6)]) + [(3, 4), (4, 5)] == [(1, 6)]

View File

@ -2,13 +2,16 @@ import datetime
import json
from unittest import mock
import pytest
from requests.exceptions import ConnectionError
from requests.models import Response
from chrono.agendas.models import Agenda
from chrono.agendas.models import Agenda, TimePeriod
from chrono.utils.date import get_weekday_index
from chrono.utils.lingo import CheckType, get_agenda_check_types
from .utils import build_agendas, build_meetings_agenda, build_virtual_agenda, paris, utc
def test_get_weekday_index():
for date in (
@ -90,3 +93,176 @@ def test_get_agenda_check_types():
CheckType(slug='bar-reason', label='Bar reason', kind='presence'),
CheckType(slug='foo-reason', label='Foo reason', kind='absence'),
]
def test_build_meetings_agenda(db):
agenda = build_meetings_agenda(
meeting_types=[30], resources=['Re1'], desks=('Desk 1', 'monday-friday 09:00-12:00 14:00-17:00')
)
assert agenda.slug == 'agenda'
assert agenda.label == 'Agenda'
assert agenda._mt_30
assert list(agenda.meetingtype_set.all()) == [agenda._mt_30]
assert agenda.desk_set.count() == 1
assert agenda.desk_set.all()[0].slug == 'desk-1'
timeperiods = agenda.desk_set.all()[0].timeperiod_set.all()
assert timeperiods.count() == 10
assert set(timeperiods.values_list('weekday', flat=True)) == set(range(0, 5))
assert set(timeperiods.values_list('start_time', 'end_time')) == {
(datetime.time(9), datetime.time(12)),
(datetime.time(14), datetime.time(17)),
}
assert agenda.resources.count() == 1
assert agenda.resources.get().label == 'Re1'
assert agenda.resources.get().slug == 're1'
def test_build_meetings_agenda_multiple_desks(db):
agenda = build_meetings_agenda(
meeting_types=[30],
desks={
'desk-1': ['monday-friday 09:00-12:00'],
'desk-2': ['monday-friday 14:00-17:00'],
},
)
desks = agenda.desk_set.all()
assert set(desks.values_list('slug', flat=True)) == {'desk-1', 'desk-2'}
assert agenda.desk_set.all()[0].slug == 'desk-1'
timeperiods = TimePeriod.objects.filter(desk__in=desks)
assert timeperiods.count() == 10
assert set(timeperiods.values_list('weekday', flat=True)) == set(range(0, 5))
assert set(timeperiods.values_list('start_time', 'end_time')) == {
(datetime.time(9), datetime.time(12)),
(datetime.time(14), datetime.time(17)),
}
def test_build_virtual_agenda(db):
agenda = build_virtual_agenda(
agendas={
'Agenda 1': {
'desks': ('Bureau 1', 'monday-friday 08:00-12:00 14:00-17:00'),
},
'Agenda 2': {
'desks': ('Bureau 1', 'monday,tuesday 09:00-12:00'),
},
'Agenda 3': {
'desks': ('Bureau 1', 'monday-friday 15:00-17:00'),
},
},
meeting_types=[30],
)
assert agenda._agenda_1
assert agenda._agenda_1._mt_30
assert agenda._agenda_2
assert agenda._agenda_3
assert agenda._mt_30
assert agenda.real_agendas.count() == 3
timeperiods = TimePeriod.objects.filter(desk__agenda__in=agenda.real_agendas.all())
assert timeperiods.count() == 17
assert set(timeperiods.values_list('weekday', flat=True)) == set(range(0, 5))
assert set(timeperiods.values_list('start_time', 'end_time')) == {
(datetime.time(8), datetime.time(12)),
(datetime.time(9), datetime.time(12)),
(datetime.time(14), datetime.time(17)),
(datetime.time(15), datetime.time(17)),
}
assert agenda._agenda_1
def test_build_agendas(db):
# pylint: disable=unused-variable
ICS_SAMPLE = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTAMP:20170824T082855Z
DTSTART:20170831T170800Z
DTEND:20170831T203400Z
SEQUENCE:1
SUMMARY:Événement 1
END:VEVENT
BEGIN:VEVENT
DTSTAMP:20170824T092855Z
DTSTART:20170830T180800Z
DTEND:20170831T223400Z
SEQUENCE:2
END:VEVENT
END:VCALENDAR"""
unavailability_calendar, agenda_1, agenda_2, virtual = build_agendas(
'''
unavailability-calendar Congés
exception Noël
start_datetime paris('2023-12-25T00:00:00 24h')
exception-source sample.ics ICS_SAMPLE
meetings "Agenda 1" maximal_booking_delay=15 30 45 # comment 1
# comment 2
desk "Desk 1"
timeperiod monday-friday 08:00-12:00
exception Grève
start_datetime paris('2023-04-01T01:01:01')
end_datetime paris('2023-04-01T01:01:01')
exception-source sample.ics ICS_SAMPLE
unavailability-calendar Congés
desk "Bureau 2"
timeperiod monday-friday 14:00-17:00
meetings 'Agenda 2' 30
desk "Desk 1" monday-friday 08:00-12:00
desk "Desk 2" monday,friday 14:00-17:00
virtual "Agenda 3" 30
meetings CNI
desk bureau1 monday-friday 10:00-12:00
meetings Passeport
desk bureau2 monday-friday 14:00-17:00
'''
)
assert unavailability_calendar.label == 'Congés'
assert unavailability_calendar._noel
assert unavailability_calendar._sample_ics
assert agenda_1.label == 'Agenda 1'
assert agenda_1.maximal_booking_delay == 15
assert agenda_1._desk_1._greve
assert agenda_1._desk_1._sample_ics
assert agenda_1._desk_1
assert agenda_1._desk_1._conges == unavailability_calendar
assert agenda_1._bureau_2
assert agenda_1._mt_30
assert agenda_1._mt_45
assert agenda_2.label == 'Agenda 2'
assert agenda_2._desk_1
assert agenda_2._desk_2
assert agenda_2._mt_30
assert virtual._cni._bureau1
assert virtual._cni._mt_30
assert virtual._mt_30
assert virtual._passeport._bureau2
assert virtual._passeport._mt_30
def test_build_agendas_indentation_mismatch(db):
with pytest.raises(SyntaxError, match=r'on line 5'):
build_agendas(
'''
agenda xxx
zobi dd
kkk
iii # here, bad indentation
agenda "Agenda 1" 30
desk 1
desk 2
'''
)
def test_paris():
assert paris('2023-04-19T11:00:00').isoformat() == '2023-04-19T11:00:00+02:00'
def test_utc():
assert utc('2023-04-19T11:00:00').isoformat() == '2023-04-19T11:00:00+00:00'

View File

@ -1,3 +1,46 @@
# chrono - agendas system
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import re
import sys
import textwrap
import typing
import zoneinfo
from django.core.files.base import ContentFile
from django.utils.text import slugify
from chrono.agendas.models import (
Agenda,
AgendaReminderSettings,
Booking,
Desk,
Event,
MeetingType,
Resource,
TimePeriod,
TimePeriodException,
TimePeriodExceptionSource,
UnavailabilityCalendar,
)
from chrono.utils.interval import Interval
from chrono.utils.timezone import localtime
def login(app, username='admin', password='admin'):
login_page = app.get('/login/')
login_form = login_page.forms[0]
@ -6,3 +49,549 @@ def login(app, username='admin', password='admin'):
resp = login_form.submit()
assert resp.status_int == 302
return app
def build_timeperiods(desk, *timeperiods):
'''Build timeperiods for a desk. timperiods is a list of string with
format:
monday-friday;9:00-12:00,14:00-17:00
saturday;9:00-12:00
'''
for defn in timeperiods:
if isinstance(defn, str):
defn = defn.strip()
parts = defn.split()
else:
parts = defn
days = []
openings = []
first_opening = ([i for i, part in enumerate(parts) if part[:1].isdigit()] or [len(parts)])[0]
days = parts[:first_opening]
openings = parts[first_opening:]
assert days and openings, defn
for day in days:
day_to_weekday = {
'monday': 0,
'tuesday': 1,
'wednesday': 2,
'thursday': 3,
'friday': 4,
'saturday': 5,
'sunday': 6,
}
if '-' in day:
begin, end = day.split('-')
begin_weekday = day_to_weekday[begin]
end_weekday = day_to_weekday[end]
assert begin_weekday < end_weekday, f'{begin} must be before {end}: {day}'
weekdays = range(begin_weekday, end_weekday + 1)
else:
weekdays = [datetime.datetime.strptime(d, '%A').weekday() for d in day.split(',')]
for weekday in weekdays:
for opening in openings:
start, end = opening.split('-')
start_time = datetime.datetime.strptime(start, '%H:%M').time()
end_time = datetime.datetime.strptime(end, '%H:%M').time()
TimePeriod.objects.create(
desk=desk, weekday=weekday, start_time=start_time, end_time=end_time
)
def build_agenda(kind, label='Agenda', slug=None, **kwargs):
agenda_kwargs = {
'label': label,
'slug': slug or slugify(label),
'kind': kind,
}
agenda_kwargs.update(kwargs)
return Agenda.objects.create(**agenda_kwargs)
def build_meetings_agenda(
label='Agenda',
slug=None,
meeting_types: typing.Union[int, tuple[str, int]] = None,
desks: typing.Union[str, dict] = None,
resources=None,
reminder_settings: list = None,
**kwargs,
):
agenda = build_agenda('meetings', label=label, slug=slug, **(kwargs or {}))
for meeting_type in meeting_types or []:
if isinstance(meeting_type, (int, str)):
meeting_type = {'slug': meeting_type}
add_meeting_type(agenda, **meeting_type)
if isinstance(desks, tuple):
desks = [desks]
if desks and isinstance(desks, list) and isinstance(desks[0], tuple):
desks = dict(desks)
if isinstance(desks, dict):
desks = [{'label': key, 'timeperiods': value} for key, value in desks.items()]
for desk_kwargs in desks or []:
add_desk(agenda, **desk_kwargs)
for reminder_setting in reminder_settings or []:
AgendaReminderSettings.objects.create(agenda=agenda, **reminder_setting)
for label in resources or []:
slug = slugify(label)
resource, _ = Resource.objects.get_or_create(slug=slugify(label), defaults={'label': label})
agenda.resources.add(resource)
setattr(agenda, f'_re_{slug.replace("-", "_")}', resource)
return agenda
def build_virtual_agenda(
label: str = 'Agenda',
slug: str = None,
agendas: list[Agenda] = None,
meeting_types: list = None,
**kwargs,
):
agenda = build_agenda('virtual', label=label, slug=slug, **(kwargs or {}))
if isinstance(agendas, dict):
agendas = [{'label': label, **defn} for label, defn in agendas.items()]
for agenda_defn in agendas or []:
if isinstance(agenda_defn, Agenda):
real_agenda = agenda_defn
else:
agenda_defn['meeting_types'] = agenda_defn.get('meeting_types') or meeting_types
real_agenda = build_meetings_agenda(**agenda_defn)
agenda.real_agendas.add(real_agenda)
setattr(agenda, f'_{real_agenda.slug.replace("-", "_")}', real_agenda)
for virtual_meeting_type in agenda.iter_meetingtypes():
virtual_meeting_type.agenda = agenda
setattr(agenda, f'_{virtual_meeting_type.slug.replace("-", "_")}', virtual_meeting_type)
return agenda
def build_unavailability_calendar(label: str, slug: str = None, exceptions=None, exception_sources=None):
slug = slug or slugify(label)
unavailability_calendar, dummy = UnavailabilityCalendar.objects.update_or_create(
slug=slug, defaults={'label': label}
)
for exception in exceptions or []:
add_exception(unavailability_calendar, **exception)
for exception in exception_sources or []:
add_exception_source(unavailability_calendar, **exception)
return unavailability_calendar
def add_unavailability_calendar(desk: Desk, **kwargs):
unavailability_calendar = build_unavailability_calendar(**kwargs)
unavailability_calendar.desks.add(desk)
setattr(desk, f'_{unavailability_calendar.slug.replace("-", "_")}', unavailability_calendar)
return unavailability_calendar
def add_desk(
agenda: Agenda,
label: str,
timeperiods=None,
exceptions=None,
exception_sources=None,
unavailability_calendars=None,
):
desk = Desk.objects.create(label=label, agenda=agenda, slug=slugify(label))
setattr(agenda, f'_{desk.slug.replace("-", "_")}', desk)
if timeperiods:
if not isinstance(timeperiods, list):
timeperiods = [timeperiods]
try:
build_timeperiods(desk, *timeperiods)
except Exception as e:
raise ValueError(f'invalid timeperiods "{e}": {timeperiods}')
for exception in exceptions or []:
add_exception(desk, **exception)
for exception_source in exception_sources or []:
add_exception_source(desk, **exception_source)
for unavailability_calendar in unavailability_calendars or []:
add_unavailability_calendar(desk, **unavailability_calendar)
return desk
def add_meeting_type(agenda: Agenda, slug: str, duration: int = None):
if duration is None:
if isinstance(slug, int) or ':' not in slug:
duration = int(slug)
slug = f'mt-{duration}'
else:
slug, duration = slug.split(':')
duration = int(duration)
meeting_type = MeetingType.objects.create(agenda=agenda, slug=slug, duration=duration)
setattr(agenda, f'_{meeting_type.slug.replace("-", "_")}', meeting_type)
return meeting_type
def add_day_timeperiod(
target: typing.Union[Desk, Agenda],
start: typing.Union[datetime.datetime, Interval],
end: typing.Union[datetime.datetime, int] = None,
):
if isinstance(target, Desk):
desk = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if isinstance(start, Interval):
start, end = start
elif isinstance(end, int):
end = start + datetime.timdelta(minutes=end)
assert localtime(start).date() == localtime(end).date()
desk.timeperiod_set.create(
date=localtime(start).date(),
start_time=localtime(start).time().replace(tzinfo=None),
end_time=localtime(end).time().replace(tzinfo=None),
)
def add_exception(
target: typing.Union[Desk, Agenda, UnavailabilityCalendar],
start_datetime: typing.Union[datetime.datetime, Interval],
end_datetime: typing.Union[datetime.datetime, int] = None,
label: str = None,
):
desk = None
unavailability_calendar = None
if isinstance(target, Desk):
desk = target
elif isinstance(target, UnavailabilityCalendar):
unavailability_calendar = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if isinstance(start_datetime, Interval):
start_datetime, end_datetime = start_datetime
elif isinstance(end_datetime, int):
end_datetime = start_datetime + datetime.timdelta(minutes=end_datetime)
timeperiodexception, _ = TimePeriodException.objects.get_or_create(
desk=desk,
unavailability_calendar=unavailability_calendar,
label=label,
start_datetime=start_datetime,
end_datetime=end_datetime,
unavailability_calendar__isnull=True,
group__isnull=True,
)
if label:
setattr(target, f'_{slugify(label)}', timeperiodexception)
return timeperiodexception
def add_exception_source(target: typing.Union[Desk, Agenda], ics_filename: str, ics_file):
desk = None
unavailability_calendar = None
if isinstance(target, Desk):
desk = target
elif isinstance(target, UnavailabilityCalendar):
unavailability_calendar = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if isinstance(ics_file, str):
ics_file = ics_file.encode()
if isinstance(ics_file, bytes):
ics_file = ContentFile(ics_file, name=ics_filename)
timeperiodexceptionsource = TimePeriodExceptionSource.objects.create(
desk=desk,
unavailability_calendar=unavailability_calendar,
ics_filename=ics_filename,
ics_file=ics_file,
)
setattr(target, f'_{slugify(ics_filename.replace(".", "_"))}', timeperiodexceptionsource)
return timeperiodexceptionsource
def add_meeting(
target: typing.Union[Desk, Agenda],
start_datetime: datetime.datetime,
meeting_type: MeetingType = None,
**kwargs,
):
if isinstance(target, Desk):
desk = target
else:
assert target.kind != 'events'
desk = target.desk_set.get()
if meeting_type is None:
meeting_type = desk.agenda.meetingtype_set.get()
event = Event.objects.create(
agenda=desk.agenda,
desk=desk,
meeting_type=meeting_type,
start_datetime=start_datetime,
full=False,
places=1,
)
booking = Booking.objects.create(event=event, **kwargs)
event.booking = booking
return event
PARIS_TZ = zoneinfo.ZoneInfo('Europe/Paris')
UTC = zoneinfo.ZoneInfo('UTC')
def datetime_in_tz(s, tz):
duration = None
if ':' not in s.rsplit(' ')[-1]:
s, duration = s.rsplit(' ', 1)
if duration.endswith('h'):
duration = datetime.timedelta(hours=int(duration[:-1]))
elif duration.endswith('m'):
duration = datetime.timedelta(minutes=int(duration[:-1]))
else:
raise ValueError(f'bad datetime string: {s}')
dt = datetime.datetime.fromisoformat(s)
assert not dt.tzinfo, 's must not contain timezone offset'
dt = dt.replace(tzinfo=tz)
if duration:
return Interval(dt, dt + duration)
return dt
def build_agendas(description):
'''Mini-language to build agendas'''
regexp = re.compile(r'(?:\'[^\']+\'|"[^"]+"|[^ \'"]+)+')
call_frame = sys._getframe(1)
description = textwrap.dedent(description)
class Node(typing.NamedTuple):
tokens: list[str]
nodes: list
line_number: int
@classmethod
def from_tokens(cls, tokens, line_number):
return cls(tokens=tokens, nodes=[], line_number=line_number)
class Stack(typing.NamedTuple):
level: int
nodes: list
nodes = []
stack = [Stack(0, nodes)]
lines = list(enumerate(description.splitlines(), start=1))
def eval_value(value):
return eval(value, call_frame.f_globals, call_frame.f_locals) # pylint: disable=eval-used
def eval_label(value):
if value[0] in '"\'':
return value[1:-1]
return value
def syntax_error(msg, line_number=None, node=None):
line_number = line_number or node.line_number
parts = [f'{msg} on line {line_number}']
parts += [
f'{"> " if i == line_number else " "}{i}: {line}'
for i, line in lines[max(line_number - 4, 0) : min(line_number + 3, len(lines))]
]
return SyntaxError('\n'.join(parts))
for line_number, line in lines:
indentation = len(line) - len(line.lstrip())
tokens = []
for token in regexp.findall(line.strip()):
# comment
if token.startswith('#'):
break
if token:
tokens.append(token)
if not tokens:
continue
level = stack[-1].level
if indentation == stack[-1].level:
pass
elif indentation > level: # indent
stack.append(Stack(level=indentation, nodes=stack[-1].nodes[-1].nodes))
else: # deindent
while indentation < level:
stack.pop()
level = stack[-1].level
if level != indentation:
raise syntax_error('indentation mismatch', line_number=line_number)
stack[-1].nodes.append(Node.from_tokens(tokens, line_number))
def parse_unavailability_calendar(node):
dummy, *rest = node.tokens
try:
label, slug, *rest = rest
kwargs = {'label': label, 'slug': slug}
except ValueError:
(label,) = rest
kwargs = {'label': label}
exceptions = []
exception_sources = []
for node in node.nodes:
kind, *dummy = node.tokens
if kind == 'exception':
exceptions.append(parse_exception(node))
elif kind == 'exception-source':
exception_sources.append(parse_exception_source(node))
return {**kwargs, 'exceptions': exceptions, 'exception_sources': exception_sources}
def parse_node(node):
kind, *dummy = node.tokens
try:
if kind == 'meetings':
return build_meetings_agenda(**parse_meetings(node))
if kind == 'virtual':
return build_virtual_agenda(**parse_virtual(node))
if kind == 'unavailability-calendar':
return build_unavailability_calendar(**parse_unavailability_calendar(node))
raise ValueError('unknown node type')
except Exception as e:
raise syntax_error(e, node=node)
def parse_kwargs(tokens):
kwargs = {}
for token in tokens:
key, value = token.split('=', 1)
kwargs[key] = eval_value(value)
return kwargs
def parse_agenda_kwargs(tokens):
meeting_types = []
kwargs = {'meeting_types': meeting_types}
for token in tokens:
if not meeting_types:
if '=' in token:
key, value = token.split('=', 1)
kwargs[key] = eval_value(value)
continue
try:
meeting_types.append(int(token))
except Exception:
raise ValueError(f'invalid meeting type duration "{token}"')
return kwargs
def parse_exception(node):
dummy, *rest = node.tokens
kwargs = {}
try:
(label,) = rest
kwargs['label'] = eval_label(rest[0])
except ValueError:
try:
label, start_datetime = rest
start_datetime = eval_value(start_datetime)
return {'label': label, 'start_datetime': start_datetime}
except ValueError:
label, start_datetime, end_datetime, *rest = rest
start_datetime = eval_value(start_datetime)
end_datetime = eval_value(end_datetime)
return {'label': label, 'start_datetime': start_datetime, 'end_datetime': end_datetime}
else:
for node in node.nodes:
key, *rest = node.tokens
kwargs[key] = eval_value(' '.join(rest))
return kwargs
def parse_exception_source(node):
dummy, filename, *rest = node.tokens
return {'ics_filename': filename, 'ics_file': eval_value(' '.join(rest))}
def parse_desk(node):
kind, label, *inline_timeperiod = node.tokens
if kind != 'desk':
raise syntax_error('invalid desk description', node=node)
timeperiods = []
exceptions = []
exception_sources = []
unavailability_calendars = []
for node in node.nodes:
kind, *dummy = node.tokens
if kind == 'timeperiod':
timeperiods.append(parse_timeperiod(node))
elif kind == 'exception':
exceptions.append(parse_exception(node))
elif kind == 'exception-source':
exception_sources.append(parse_exception_source(node))
elif kind == 'unavailability-calendar':
unavailability_calendars.append(parse_unavailability_calendar(node))
else:
raise ValueError('unknown desk child node')
if inline_timeperiod:
timeperiods.insert(0, inline_timeperiod)
return {
'label': eval_label(label),
'timeperiods': timeperiods,
'exceptions': exceptions,
'exception_sources': exception_sources,
'unavailability_calendars': unavailability_calendars,
}
def parse_timeperiod(node):
kind, *defn = node.tokens
if kind != 'timeperiod':
raise syntax_error('invalid timeperiod description', node=node)
return defn
def parse_mt(node):
dummy, slug, *rest = node.tokens
if rest:
return {'slug': slug, 'duration': int(rest[0])}
else:
return slug
def parse_reminder_setting(node):
dummy, *rest = node.tokens
return parse_kwargs(rest)
def parse_meetings(node):
dummy, label, *rest = node.tokens
kwargs = parse_agenda_kwargs(rest)
desks = []
meeting_types = kwargs.setdefault('meeting_types', [])
for node in node.nodes:
kind, *rest = node.tokens
if kind == 'desk':
desks.append(parse_desk(node))
elif kind == 'meeting-type':
meeting_types.append(parse_mt(node))
elif kind == 'reminder-setting':
kwargs.setdefault('reminder_settings', []).append(parse_reminder_setting(node))
else:
raise ValueError('unknown meeting children')
return {**kwargs, 'label': eval_label(label), 'desks': desks}
def parse_virtual(node):
dummy, label, *rest = node.tokens
kwargs = parse_agenda_kwargs(rest)
return {**kwargs, 'label': eval_label(label), 'agendas': list(map(parse_meetings, node.nodes))}
objects = [parse_node(node) for node in nodes]
if len(objects) > 1:
return objects
else:
return objects[0]
def paris(s):
return datetime_in_tz(s, tz=PARIS_TZ)
def utc(s):
return datetime_in_tz(s, tz=UTC)