api: optimize get_all_slots() and around it (#42169)

Workflow in get_all_slots() is simplified :
* first we accumulate, for each desk, the set of time slots when a booking cannot
occur or is already booked,
* then we generate the list of possible time slots and match them to the
exclusion and already booked set.

Intervals is replaced by a simpler data-structure, IntervalSet, it does
not need to be a map, a simple set is enough.

Also :
* moved TimePeriod.get_effective_timeperiods() to the agenda level , it
deduplictes TimePeriod between desks and remove excluded TimePeriod for
virtual agendas.

* added a named-tuple WeekTime to represent a TimePeriod base unit, so
we can use them in IntervalSet easily (as they can be compared) to
compute the effective time periods,

* the fact that base_duration is unique for a given virtual agenda is
now accounted and stated everywhere,

* the fact that generated time slots must have time in the local
timezone for the API to work is now stated everywhere,

* In get_all_slots(), also :
  * integrated the code of get_exceptions_by_desk() into get_all_slots()
  to further reduce the number of SQL queries.
  * used_min/max_datetime is reduced by the exclusion periods, and
    effective time periods are grouped based on the used_min/max_datetime of
    each agenda.
  * pre-filter slots for uniqueness when generating available datetimes
    (but for filling slot we still need exact availability information
    for each desk)
This commit is contained in:
Benjamin Dauvergne 2020-04-30 12:16:38 +02:00
parent 3c5d056fa3
commit ece063b2b3
9 changed files with 759 additions and 524 deletions

View File

@ -16,11 +16,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import collections
import datetime
import functools
import itertools
import math
import uuid
import requests
import vobject
import uuid
import django
from django.conf import settings
@ -33,14 +37,13 @@ from django.urls import reverse
from django.utils.dates import WEEKDAYS
from django.utils.encoding import force_text
from django.utils.formats import date_format
from django.utils.functional import cached_property
from django.utils.text import slugify
from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
from django.utils.translation import ugettext_lazy as _
from jsonfield import JSONField
from ..interval import Intervals
from chrono.interval import Interval, IntervalSet
AGENDA_KINDS = (
@ -296,8 +299,71 @@ class Agenda(models.Model):
return created
def get_effective_time_periods(self):
'''Regroup timeperiods by desks.
List all timeperiods, timeperiods having the same begin_time and
end_time are regrouped in a SharedTimePeriod object, which has a
list of desks instead of only one desk.
'''
if self.kind == 'virtual':
return self.get_effective_time_periods_virtual()
elif self.kind == 'meetings':
return self.get_effective_time_periods_meetings()
else:
raise ValueError('does not work with kind %r' % self.kind)
def get_effective_time_periods_meetings(self):
'''List timeperiod instances for all desks of the agenda, convert them
into an Interval of WeekTime which can be compared and regrouped using
itertools.groupby.
'''
yield from (
SharedTimePeriod.from_weektime_interval(
weektime_interval, desks=[time_period.desk for time_period in time_periods],
)
for weektime_interval, time_periods in itertools.groupby(
TimePeriod.objects.filter(desk__agenda=self)
.prefetch_related('desk')
.order_by('weekday', 'start_time', 'end_time'),
key=TimePeriod.as_weektime_interval,
)
)
def get_effective_time_periods_virtual(self):
'''List timeperiod instances for all desks of all real agendas of this
virtual agenda, convert them into an Interval of WeekTime which can be
compared and regrouped using itertools.groupby.
'''
closed_hours_by_days = IntervalSet.from_ordered(
[
time_period.as_weektime_interval()
for time_period in self.excluded_timeperiods.order_by('weekday', 'start_time', 'end_time')
]
)
for time_period_interval, time_periods in itertools.groupby(
TimePeriod.objects.filter(desk__agenda__virtual_agendas=self)
.order_by('weekday', 'start_time', 'end_time')
.prefetch_related('desk'),
key=lambda tp: tp.as_weektime_interval(),
):
time_periods = list(time_periods)
desks = [time_period.desk for time_period in time_periods]
if not closed_hours_by_days:
yield SharedTimePeriod.from_weektime_interval(time_period_interval, desks=desks)
else:
for weektime_interval in IntervalSet.simple(*time_period_interval) - closed_hours_by_days:
yield SharedTimePeriod.from_weektime_interval(weektime_interval, desks=desks)
class VirtualMember(models.Model):
'''Trough model to link virtual agendas to their real agendas.
Real agendas linked to a virtual agenda MUST all have the same list of
MeetingType based on their label, slug and duration. It's enforced by
VirtualMember.clean() and the realted management views.
'''
virtual_agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE, related_name='real_members')
real_agenda = models.ForeignKey(
Agenda, on_delete=models.CASCADE, related_name='virtual_members', verbose_name='Agenda'
@ -353,22 +419,12 @@ class VirtualMember(models.Model):
WEEKDAYS_LIST = sorted(WEEKDAYS.items(), key=lambda x: x[0])
class TimeSlot(object):
def __init__(self, start_datetime, meeting_type, desk):
self.start_datetime = start_datetime
self.end_datetime = start_datetime + datetime.timedelta(minutes=meeting_type.duration)
self.meeting_type = meeting_type
self.desk = desk
class WeekTime(collections.namedtuple('WeekTime', ['weekday', 'time'])):
'''Representation of a time point in a weekday, ex.: Monday at 5 o'clock.
'''
@cached_property
def id(self):
return '%s:%s' % (
self.meeting_type.id or self.meeting_type.slug,
self.start_datetime.strftime('%Y-%m-%d-%H%M'),
)
def __str__(self):
return date_format(self.start_datetime, format='DATETIME_FORMAT')
def __repr__(self):
return '%s / %s' % (force_text(WEEKDAYS[self.weekday]), date_format(self.time, 'TIME_FORMAT'),)
class TimePeriod(models.Model):
@ -422,56 +478,96 @@ class TimePeriod(models.Model):
return new_period
def get_effective_timeperiods(self, excluded_timeperiods):
effective_timeperiods = [self]
for excluded_timeperiod in excluded_timeperiods:
res = []
for effective_timeperiod in effective_timeperiods:
if (
excluded_timeperiod.weekday != effective_timeperiod.weekday
or excluded_timeperiod.start_time >= effective_timeperiod.end_time
or excluded_timeperiod.end_time <= effective_timeperiod.start_time
):
res.append(effective_timeperiod)
continue
if (
excluded_timeperiod.start_time <= effective_timeperiod.start_time
and excluded_timeperiod.end_time >= effective_timeperiod.end_time
):
# completely exclude
continue
if excluded_timeperiod.start_time > effective_timeperiod.start_time:
res.append(
TimePeriod(
weekday=effective_timeperiod.weekday,
start_time=effective_timeperiod.start_time,
end_time=excluded_timeperiod.start_time,
desk=effective_timeperiod.desk,
)
)
if excluded_timeperiod.end_time < effective_timeperiod.end_time:
res.append(
TimePeriod(
weekday=effective_timeperiod.weekday,
start_time=excluded_timeperiod.end_time,
end_time=effective_timeperiod.end_time,
desk=effective_timeperiod.desk,
)
)
def as_weektime_interval(self):
return Interval(WeekTime(self.weekday, self.start_time), WeekTime(self.weekday, self.end_time),)
effective_timeperiods = res
def as_shared_timeperiods(self):
return SharedTimePeriod(
weekday=self.weekday, start_time=self.start_time, end_time=self.end_time, desks=[self.desk],
)
return effective_timeperiods
def get_time_slots(self, min_datetime, max_datetime, meeting_type, base_duration):
meeting_duration = datetime.timedelta(minutes=meeting_type.duration)
@functools.total_ordering
class SharedTimePeriod(object):
'''
Hold common timeperiod for multiple desks.
To improve performance when generating meetings slots for virtual
agendas or agendas with many desks, we deduplicate time-periods between
all desks of all agendas.
Deduplication is based on a common key, and implemented through __eq__
and __lt__ which will be used by itertools.groupby().
(weekday, start_datetime, end_datetime)
it's done in the deduplicate() classmethod.
At the level of gel_all_slots() timeperiod are re-duplicated if the
min_datetime,max_datetime of the desk's agendas differs (see the code
of get_all_slots() for details).
'''
__slots__ = ['weekday', 'start_time', 'end_time', 'desks']
def __init__(self, weekday, start_time, end_time, desks):
self.weekday = weekday
self.start_time = start_time
self.end_time = end_time
self.desks = set(desks)
def __str__(self):
return u'%s / %s%s' % (
force_text(WEEKDAYS[self.weekday]),
date_format(self.start_time, 'TIME_FORMAT'),
date_format(self.end_time, 'TIME_FORMAT'),
)
def __eq__(self, other):
return (self.weekday, self.start_time, self.end_time) == (
other.weekday,
other.start_time,
other.end_time,
)
def __lt__(self, other):
return (self.weekday, self.start_time, self.end_time) < (
other.weekday,
other.start_time,
other.end_time,
)
def get_time_slots(self, min_datetime, max_datetime, meeting_duration, base_duration):
'''Generate all possible time slots between min_datetime and max_datime
of duration meeting_duration minutes and spaced by base_duration
minutes, i.e.
compute a list [a,b] -> [c,d] -> ...
where b-a = meeting_duration and c-a = base_duration.
We start with the first time following min_datetime and being on
the same weekday of the current period.
Then we iterate, advancing by base_duration minutes each time.
If we cross the end_time of the period or end of the current_day
(means end_time is midnight), it advance time to self.start_time on
the next week (same weekday, same start, one week in the future).
When it crosses end_datetime it stops.
Generated start_datetime MUST be in the local timezone as the API
needs it to generate stable ids.
'''
meeting_duration = datetime.timedelta(minutes=meeting_duration)
duration = datetime.timedelta(minutes=base_duration)
real_min_datetime = min_datetime + datetime.timedelta(days=self.weekday - min_datetime.weekday())
if real_min_datetime < min_datetime:
real_min_datetime += datetime.timedelta(days=7)
# make sure datetime is aligned on timezone
# make sure datetime in local timezone, it's ABSOLUTELY necessary
# to have stable event ids in the API.
event_datetime = make_aware(make_naive(real_min_datetime)).replace(
hour=self.start_time.hour, minute=self.start_time.minute, second=0, microsecond=0
)
@ -494,9 +590,15 @@ class TimePeriod(models.Model):
if event_datetime > max_datetime:
break
yield TimeSlot(start_datetime=event_datetime, meeting_type=meeting_type, desk=self.desk)
yield event_datetime
event_datetime = next_time
@classmethod
def from_weektime_interval(cls, weektime_interval, desks=()):
begin, end = weektime_interval
assert begin.weekday == end.weekday
return cls(weekday=begin.weekday, start_time=begin.time, end_time=end.time, desks=desks,)
class MeetingType(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
@ -755,6 +857,9 @@ class Booking(models.Model):
return new_booking
OpeningHour = collections.namedtuple('OpeningHour', ['begin', 'end'])
class Desk(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
label = models.CharField(_('Label'), max_length=150)
@ -952,20 +1057,21 @@ class Desk(models.Model):
return total_created
def get_opening_hours(self, date):
openslots = Intervals()
openslots = IntervalSet()
for timeperiod in self.timeperiod_set.filter(weekday=date.weekday()):
start_datetime = make_aware(datetime.datetime.combine(date, timeperiod.start_time))
end_datetime = make_aware(datetime.datetime.combine(date, timeperiod.end_time))
openslots.add(start_datetime, end_datetime)
aware_date = make_aware(datetime.datetime(date.year, date.month, date.day))
exceptions = IntervalSet()
aware_next_date = aware_date + datetime.timedelta(days=1)
for exception in self.timeperiodexception_set.filter(
start_datetime__lt=aware_next_date, end_datetime__gt=aware_date
):
openslots.remove(exception.start_datetime, exception.end_datetime)
exceptions.add(exception.start_datetime, exception.end_datetime)
return openslots.search(aware_date, aware_next_date)
return [OpeningHour(*time_range) for time_range in (openslots - exceptions)]
def ics_directory_path(instance, filename):
@ -1091,3 +1197,7 @@ class TimePeriodException(models.Model):
new_exception.save()
return new_exception
def as_interval(self):
'''Simplify insertion into IntervalSet'''
return Interval(self.start_datetime, self.end_datetime)

View File

@ -14,8 +14,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import defaultdict
import collections
import datetime
import itertools
from django.db import transaction
from django.http import Http404, HttpResponse
@ -23,6 +25,7 @@ from django.shortcuts import get_object_or_404
from django.urls import reverse
from django.utils.dateparse import parse_date
from django.utils.encoding import force_text
from django.utils.formats import date_format
from django.utils.timezone import now, make_aware, localtime
from django.utils.translation import gettext_noop
from django.utils.translation import ugettext_lazy as _
@ -32,23 +35,14 @@ from rest_framework import permissions, serializers, status
from rest_framework.views import APIView
from chrono.api.utils import Response
from ..agendas.models import Agenda, Event, Booking, MeetingType, TimePeriod, Desk
from ..interval import Intervals
from ..agendas.models import Agenda, Event, Booking, MeetingType, TimePeriodException, Desk
from ..interval import IntervalSet
def format_response_datetime(dt):
return localtime(dt).strftime('%Y-%m-%d %H:%M:%S')
def get_exceptions_by_desk(agenda):
exceptions_by_desk = {}
for desk in Desk.objects.filter(agenda=agenda).prefetch_related('timeperiodexception_set'):
exceptions_by_desk[desk.id] = [
(exc.start_datetime, exc.end_datetime) for exc in desk.timeperiodexception_set.all()
]
return exceptions_by_desk
def get_min_datetime(agenda):
if agenda.minimal_booking_delay is None:
return None
@ -64,93 +58,162 @@ def get_max_datetime(agenda):
return max_datetime + datetime.timedelta(days=1)
def get_all_slots(agenda, meeting_type):
min_datetime = get_min_datetime(agenda)
max_datetime = get_max_datetime(agenda)
time_period_filters = {
'min_datetime': min_datetime,
'max_datetime': max_datetime,
'meeting_type': meeting_type,
}
TimeSlot = collections.namedtuple('TimeSlot', ['start_datetime', 'end_datetime', 'full', 'desk'])
def get_all_slots(agenda, meeting_type, unique=False):
'''Get all occupation state of all possible slots for the given agenda (of
its real agendas for a virtual agenda) and the given meeting_type.
The process is done in three phases:
- first phase: aggregate time intervals, during which a meeting is impossible
due to TimePeriodException models, by desk in IntervalSet (compressed
and ordered list of intervals).
- second phase: aggregate time intervals by desk for already booked slots, again
to make IntervalSet,
- third and las phase: generate time slots from each time period based
on the time period definition and on the desk's respective agenda real
min/max_datetime; for each time slot check its status in the exclusion
and bookings sets.
If it is excluded, ingore it completely.
It if is booked, reports the slot as full.
'''
base_agenda = agenda
# virtual agendas have one constraint :
# all the real agendas MUST have the same meetingstypes, the consequence is
# that the base_meeting_duration for the virtual agenda is always the same
# as the base meeting duration of each real agenda.
base_meeting_duration = base_agenda.get_base_meeting_duration()
base_min_datetime = get_min_datetime(base_agenda)
base_max_datetime = get_max_datetime(base_agenda)
meeting_duration = meeting_type.duration
meeting_duration_td = datetime.timedelta(minutes=meeting_duration)
base_date = now().date()
agendas = agenda.get_real_agendas()
base_agenda = agenda
open_slots = {}
time_periods_by_agenda = {}
# regroup agendas by their opening period
agenda_ids_by_min_max_datetimes = collections.defaultdict(set)
agenda_id_min_max_datetime = {}
for agenda in agendas:
open_slots[agenda.id] = defaultdict(lambda: Intervals())
time_periods_by_agenda[agenda.id] = []
used_min_datetime = base_min_datetime or get_min_datetime(agenda)
used_max_datetime = base_max_datetime or get_max_datetime(agenda)
agenda_ids_by_min_max_datetimes[(used_min_datetime, used_max_datetime)].add(agenda.id)
agenda_id_min_max_datetime[agenda.id] = (used_min_datetime, used_max_datetime)
# preload time periods
for period in TimePeriod.objects.filter(desk__agenda__in=agendas).prefetch_related('desk__agenda'):
time_periods_by_agenda[period.desk.agenda_id].append(period)
# aggregate time period exceptions by desk as IntervalSet for fast querying
# 1. sort exceptions by start_datetime
# 2. group them by desk
# 3. convert each desk's list of exception to intervals then IntervalSet
desks_exceptions = {
time_period_desk: IntervalSet.from_ordered(
map(TimePeriodException.as_interval, time_period_exceptions)
)
for time_period_desk, time_period_exceptions in itertools.groupby(
TimePeriodException.objects.filter(desk__agenda__in=agendas).order_by(
'desk', 'start_datetime', 'end_datetime'
),
key=lambda time_period: time_period.desk,
)
}
# compute reduced min/max_datetime windows by desks based on exclusions
desk_min_max_datetime = {}
for desk, desk_exception in desks_exceptions.items():
base = IntervalSet([agenda_id_min_max_datetime[desk.agenda_id]])
base = base - desk_exception
min_datetime = base.min().replace(hour=0, minute=0, second=0, microsecond=0)
max_datetime = base.max()
start_of_day = max_datetime.replace(hour=0, minute=0, second=0, microsecond=0)
# move to end of the day if max_datetime is not on a day boundary
if max_datetime != start_of_day:
max_datetime = start_of_day + datetime.timedelta(days=1)
desk_min_max_datetime[desk] = (min_datetime, max_datetime)
base_agenda_excluded_timeperiods = base_agenda.excluded_timeperiods.all()
for agenda in agendas:
base_duration = agenda.get_base_meeting_duration()
used_time_period_filters = time_period_filters.copy()
if used_time_period_filters['min_datetime'] is None:
used_time_period_filters['min_datetime'] = get_min_datetime(agenda)
if used_time_period_filters['max_datetime'] is None:
used_time_period_filters['max_datetime'] = get_max_datetime(agenda)
for raw_time_period in time_periods_by_agenda[agenda.id]:
for time_period in raw_time_period.get_effective_timeperiods(base_agenda_excluded_timeperiods):
duration = (
datetime.datetime.combine(base_date, time_period.end_time)
- datetime.datetime.combine(base_date, time_period.start_time)
).seconds / 60
if duration < meeting_type.duration:
# skip time period that can't even hold a single meeting
continue
for slot in time_period.get_time_slots(
base_duration=base_duration, **used_time_period_filters
):
slot.full = False
open_slots[agenda.id][time_period.desk_id].add(
slot.start_datetime, slot.end_datetime, slot
)
# remove excluded slot
for agenda in agendas:
excluded_slot_by_desk = get_exceptions_by_desk(agenda)
for desk_id, excluded_interval in excluded_slot_by_desk.items():
for interval in excluded_interval:
begin, end = interval
open_slots[agenda.id][desk_id].remove_overlap(localtime(begin), localtime(end))
for agenda in agendas:
used_min_datetime = min_datetime
if used_min_datetime is None:
used_min_datetime = get_min_datetime(agenda)
used_max_datetime = max_datetime
if used_max_datetime is None:
used_max_datetime = get_max_datetime(agenda)
for event in (
agenda.event_set.filter(
agenda=agenda,
# aggregate already booked time intervals by desk
bookings = {}
for (used_min_datetime, used_max_datetime), agenda_ids in agenda_ids_by_min_max_datetimes.items():
booked_events = (
Event.objects.filter(
agenda__in=agenda_ids,
start_datetime__gte=used_min_datetime,
start_datetime__lte=used_max_datetime + datetime.timedelta(meeting_type.duration),
start_datetime__lte=used_max_datetime + meeting_duration_td,
)
.select_related('meeting_type')
.exclude(booking__cancellation_datetime__isnull=False)
):
for slot in open_slots[agenda.id][event.desk_id].search_data(
event.start_datetime, event.end_datetime
):
slot.full = True
# ordering is important for the later groupby, it works like sort | uniq
.order_by('desk_id', 'start_datetime')
.values_list('desk_id', 'start_datetime', 'meeting_type__duration')
)
# compute exclusion set by desk from all bookings, using
# itertools.groupby() to group them by desk_id
bookings.update(
(
desk_id,
IntervalSet.from_ordered(
(event_start_datetime, event_start_datetime + datetime.timedelta(minutes=event_duration))
for desk_id, event_start_datetime, event_duration in values
),
)
for desk_id, values in itertools.groupby(booked_events, lambda be: be[0])
)
slots = []
for agenda in agendas:
for desk_id in open_slots[agenda.id]:
slots.extend(open_slots[agenda.id][desk_id].iter_data())
slots.sort(key=lambda slot: slot.start_datetime)
return slots
unique_booked = {}
for time_period in base_agenda.get_effective_time_periods():
duration = (
datetime.datetime.combine(base_date, time_period.end_time)
- datetime.datetime.combine(base_date, time_period.start_time)
).seconds / 60
if duration < meeting_type.duration:
# skip time period that can't even hold a single meeting
continue
desks_by_min_max_datetime = {
datetime_range: list(desks)
for datetime_range, desks in itertools.groupby(
time_period.desks,
key=lambda desk: desk_min_max_datetime.get(desk, agenda_id_min_max_datetime[desk.agenda_id]),
)
}
# aggregate agendas based on their real min/max_datetime :
# the get_time_slots() result is dependant upon these values, so even
# if we deduplicated a TimePeriod for some desks, if their respective
# agendas have different real min/max_datetime we must unduplicate them
# at time slot generation phase.
for (used_min_datetime, used_max_datetime), desks in desks_by_min_max_datetime.items():
for start_datetime in time_period.get_time_slots(
min_datetime=used_min_datetime,
max_datetime=used_max_datetime,
meeting_duration=meeting_duration,
base_duration=base_meeting_duration,
):
end_datetime = start_datetime + meeting_duration_td
timestamp = start_datetime.timestamp
# skip generating datetimes if we already know that this
# datetime is available
if unique and unique_booked.get(timestamp) is False:
continue
for desk in sorted(desks, key=lambda desk: desk.label):
# ignore the slot for this desk, if it overlaps and exclusion period for this desk
excluded = desk in desks_exceptions and desks_exceptions[desk].overlaps(
start_datetime, end_datetime
)
if excluded:
continue
# slot is full if an already booked event overlaps it
booked = desk.id in bookings and bookings[desk.id].overlaps(start_datetime, end_datetime)
if unique and unique_booked.get(timestamp) is booked:
continue
unique_booked[timestamp] = booked
yield TimeSlot(
start_datetime=start_datetime, end_datetime=end_datetime, desk=desk, full=booked
)
if unique and not booked:
break
def get_agenda_detail(request, agenda):
@ -337,16 +400,29 @@ class MeetingDatetimes(APIView):
now_datetime = now()
slots = get_all_slots(agenda, meeting_type)
entries = {}
for slot in slots:
if slot.start_datetime < now_datetime:
continue
key = (slot.start_datetime, slot.end_datetime)
if key in entries and slot.full:
continue
entries[key] = slot
slots = sorted(entries.values(), key=lambda x: x.start_datetime)
# Generate an unique slot for each possible meeting [start_datetime,
# end_datetime] range.
# First use get_all_slots() to get each possible meeting by desk and
# its current status (full = booked, or not).
# Then order them by (start, end, full) where full is False for
# bookable slot, so bookable slot come first.
# Traverse them and remove duplicates, if a slot is bookable we will
# only see it (since it comes first), so it also remove "full/booked"
# slot from the list if there is still a bookable slot on a desk at the
# same time.
# The generator also remove slots starting before the current time.
def unique_slots():
last_slot = None
all_slots = list(get_all_slots(agenda, meeting_type, unique=True))
for slot in sorted(all_slots, key=lambda slot: slot[:3]):
if slot.start_datetime < now_datetime:
continue
if last_slot and last_slot[:2] == slot[:2]:
continue
last_slot = slot
yield slot
generator_of_unique_slots = unique_slots()
# create fillslot API URL as a template, to avoid expensive calls
# to request.build_absolute_uri()
@ -358,16 +434,28 @@ class MeetingDatetimes(APIView):
)
)
def make_id(start_datetime, meeting_type):
'''Make virtual id for a slot, combining meeting_type.id and
iso-format of date and time.
!!! The datetime must always be in the local timezone and the local
timezone must not change if we want the id to be stable.
It MUST be a garanty of SharedTimePeriod.get_time_slots(),
!!!
'''
return '%s:%s' % (meeting_type.id or meeting_type.slug, start_datetime.strftime('%Y-%m-%d-%H%M'),)
response = {
'data': [
{
'id': x.id,
'datetime': format_response_datetime(x.start_datetime),
'text': force_text(x),
'disabled': bool(x.full),
'api': {'fillslot_url': fillslot_url.replace(fake_event_identifier, str(x.id)),},
'id': slot_id,
'datetime': format_response_datetime(slot.start_datetime),
'text': date_format(slot.start_datetime, format='DATETIME_FORMAT'),
'disabled': bool(slot.full),
'api': {'fillslot_url': fillslot_url.replace(fake_event_identifier, slot_id),},
}
for x in slots
for slot in generator_of_unique_slots
# we do not have the := operator, so we do that
for slot_id in [make_id(slot.start_datetime, meeting_type)]
]
}
return Response(response)
@ -607,7 +695,10 @@ class Fillslots(APIView):
# get all free slots and separate them by desk
try:
all_slots = get_all_slots(agenda, agenda.get_meetingtype(id_=meeting_type_id))
all_slots = sorted(
get_all_slots(agenda, agenda.get_meetingtype(id_=meeting_type_id)),
key=lambda slot: slot.start_datetime,
)
except (MeetingType.DoesNotExist, ValueError):
return Response(
{
@ -619,7 +710,7 @@ class Fillslots(APIView):
)
all_free_slots = [slot for slot in all_slots if not slot.full]
datetimes_by_desk = defaultdict(set)
datetimes_by_desk = collections.defaultdict(set)
for slot in all_free_slots:
datetimes_by_desk[slot.desk.id].add(slot.start_datetime)
@ -627,7 +718,7 @@ class Fillslots(APIView):
if agenda.kind == 'virtual':
# Compute fill_rate by agenda/date
fill_rates = defaultdict(dict)
fill_rates = collections.defaultdict(dict)
for slot in all_slots:
ref_date = slot.start_datetime.date()
if ref_date not in fill_rates[slot.desk.agenda]:

View File

@ -14,173 +14,194 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import bisect
Interval = collections.namedtuple('Interval', ['begin', 'end'])
class Interval(object):
__slots__ = ['begin', 'end', 'data']
def __init__(self, begin, end, data=None):
assert begin < end
self.begin = begin
self.end = end
self.data = data
class IntervalSet(object):
'''Store a set made of an union of disjoint open/closed intervals (it
currently does not really care about their openness), i.e.
def __eq__(self, other):
return self.begin == other.begin and self.end == other.end and self.data == other.data
S = [a[0], b[0]] [a[n-1], b[n-1]]
def overlap(self, begin, end):
if end <= self.begin:
return False
if begin >= self.end:
return False
return True
where
def contains(self, point):
return self.begin < point < self.end
forall i < n. a_i < b_i
forall i < (n-1). b_i < a[i+1]
'''
__slots__ = ['begin', 'end']
def __init__(self, iterable=(), already_sorted=False):
'''
Initialize a new IntervalSet from a list of Interval or 2-tuple.
Iterable will be sorted, if it's already sorted use the
from_ordered() classmethod.
It's faster than using add() because intervals are merged as we
traverse the list, and self.begin and self.end are built in O(n)
time where len(iterable) = n.
'''
if not already_sorted:
iterable = sorted(iterable)
self.begin = []
self.end = []
last_begin, last_end = None, None
for begin, end in iterable:
# check good order property along the way
if last_begin and not (last_begin < begin or last_begin == begin and last_end <= end):
raise ValueError('not well ordered: ! %s <= %s' % ((last_begin, last_end), (begin, end)))
if self.begin and begin <= self.end[-1]:
self.end[-1] = max(self.end[-1], end)
else:
self.begin.append(begin)
self.end.append(end)
last_begin, last_end = begin, end
@classmethod
def simple(cls, begin, end=Ellipsis):
begin, end = cls._begin_or_interval(begin, end)
return cls.from_ordered([(begin, end)])
@classmethod
def from_ordered(cls, iterable):
return cls(iterable, already_sorted=True)
def __repr__(self):
return '<Interval [%s, %s] %s>' % (self.begin, self.end, self.data or '')
return repr(list(map(tuple, self)))
@classmethod
def _begin_or_interval(cls, begin, end=Ellipsis):
if end is Ellipsis:
return begin
else:
return begin, end
class Intervals(object):
'''Maintain a list of mostly non overlapping intervals, allow removing overlap.
def add(self, begin, end=Ellipsis):
'''Add a new interval to the set, eventually merging it with actual
ones.
Intervals are indexed by extremums, an interval is also added to the
list of all extremums contained inside the interval.
Exemple of set: a = [1, 10], b = [2, 4], c = [3, 5], d = [8, 9]
Structure, as an ordered dict of the endpoints:
{
1: [a],
2: [a, b],
3: [a, b, c],
4: [a, b, c],
5: [a, c],
8: [a, d],
9: [a, d],
10: [a],
}
'''
def __init__(self):
self.points = []
self.container = []
def __insert_point(self, point, interval):
'''Insert interval in container for this point and if the point is new
copy all intervals from the previous container containing this point.
It uses bisect_left() to maintaint the ordering of intervals.
'''
i = bisect.bisect_left(self.points, point)
if i >= len(self.container) or self.points[i] != point:
self.points.insert(i, point)
self.container.insert(i, [])
if i:
for itv in self.container[i - 1]:
if itv.contains(point):
self.container[i].append(itv)
self.container[i].append(interval)
return i
def add(self, begin, end, data=None):
'Add an interval'
self.add_interval(Interval(begin, end, data))
def add_interval(self, interval):
'Add an interval object'
a = self.__insert_point(interval.begin, interval)
b = self.__insert_point(interval.end, interval)
for i in range(a + 1, b):
self.container[i].append(interval)
def __iter_interval(self, begin, end, modify=False):
'''Search for overlapping intervals by bisecting over the list of
interval endpoints and iterating until a point after the greatest
extremum of the search interval.
We test the first point after the end of the searched interval
because if the searched interval is completely included in one of
the interval, this interval will be part of the nearest point
greater than the end point of the searched interval.
Prevent returning an interval multiple times by creating an already
seen set of intervals.
'''
seen = set()
i = bisect.bisect_left(self.points, begin)
while i < len(self.points):
container = self.container[i]
if modify:
container = list(container)
for interval in container:
if id(interval) in seen:
continue
seen.add(id(interval))
yield self.points[i], interval
if not self.points[i] <= end:
begin, end = self._begin_or_interval(begin, end)
# insert an interval by merging with previous and following intervals
# if they overlap
i = bisect.bisect_left(self.begin, begin)
# merge with previous intervals
while 0 < i and begin <= self.end[i - 1]:
# [begin, end] overlaps previous interval
# so remove it to merge them.
previous_begin = self.begin.pop(i - 1)
previous_end = self.end.pop(i - 1)
i = i - 1
if previous_begin < begin:
# but it does not include it, so replace if begin by previous.begin
begin = previous_begin
if end < previous_end:
# [begin, end] is completely included in previous interval,
# replace end by previous.end
end = previous_end
break
i += 1
# merge with following
while i < len(self.begin) and self.begin[i] <= end:
# [begin, end] overlaps next interval, so remove it to merge them.
next_end = self.end.pop(i)
self.begin.pop(i)
if end < next_end:
# but it does not include it, so replace end by next.end
end = next_end
# no symetry with the previous "while" loop as .bisect_left()
# garanty that begin is left or equal to next.begin.
break
self.begin.insert(i, begin)
self.end.insert(i, end)
def remove(self, begin, end):
'Substract interval'
for interval in list(self.iter()):
# create interval with new borders
if interval.overlap(begin, end):
if begin > interval.begin and end < interval.end:
self.add(interval.begin, begin)
self.add(end, interval.end)
elif interval.begin < begin:
self.add(interval.begin, begin)
elif interval.end > end:
self.add(end, interval.end)
self.__remove_interval(interval)
def remove_overlap(self, begin, end):
'Remove all overlapping intervals'
for point, interval in self.__iter_interval(begin, end, modify=True):
if interval.overlap(begin, end):
self.__remove_interval(interval)
def overlap(self, begin, end):
'Test if some intervals overlap'
for point, interval in self.__iter_interval(begin, end):
if interval.overlap(begin, end):
return True
def overlaps(self, begin, end=Ellipsis):
'''
Find if the [begin, end] has a non-empty (or closed) overlaps with
one of the contained intervals.
'''
begin, end = self._begin_or_interval(begin, end)
# look for non-zero size overlap
i = bisect.bisect_left(self.begin, begin)
if 0 < i and begin < self.end[i - 1]:
return True
if i < len(self.begin) and self.begin[i] < end:
return True
return False
def search(self, begin, end):
'Search overlapping intervals'
for point, interval in self.__iter_interval(begin, end):
if interval.overlap(begin, end):
# prevent returning the same interval twice
if interval.begin < begin or interval.begin == point:
yield interval
def search_data(self, begin, end):
'Search data elements of overlapping intervals'
for interval in self.search(begin, end):
yield interval.data
def iter(self):
'Iterate intervals'
if not self.points:
return []
return self.search(self.points[0], self.points[-1])
def iter_data(self):
'Iterate data element attached to intervals'
for interval in self.iter():
yield interval.data
def __remove_interval(self, interval):
'''Remove the interval from the container of its extremum points and
all containers between.
def __iter__(self):
'''
a = bisect.bisect_left(self.points, interval.begin)
b = bisect.bisect_left(self.points, interval.end)
# check some invariants
assert self.points[a] == interval.begin
assert self.points[b] == interval.end
for i in range(a, b + 1):
self.container[i].remove(interval)
Generate the ordered list of included intervals as 2-tuples.
'''
return map(Interval._make, zip(self.begin, self.end))
def __eq__(self, other):
if isinstance(other, self.__class__):
return list(self) == list(other)
if hasattr(other, '__iter__'):
return self == self.cast(other)
return False
def __bool__(self):
return bool(self.begin)
@classmethod
def cast(cls, value):
if isinstance(value, cls):
return value
return cls(value)
def __sub__(self, other):
l1 = iter(self)
l2 = iter(self.cast(other))
def gen():
state = 3
while True:
if state & 1:
c1 = next(l1, None)
if state & 2:
c2 = next(l2, None)
if not c1:
break
if not c2:
yield c1
for c1 in l1:
yield c1
break
if c1[1] <= c2[0]:
yield c1
state = 1
elif c2[1] <= c1[0]:
state = 2
elif c1[1] < c2[1]:
if c1[0] < c2[0]:
yield (c1[0], c2[0])
state = 1
elif c2[1] < c1[1]:
if c1[0] < c2[0]:
yield (c1[0], c2[0])
c1 = (c2[1], c1[1])
state = 2
elif c1[1] == c2[1]:
if c1[0] < c2[0]:
yield (c1[0], c2[0])
state = 3
else:
raise Exception('not reachable')
return self.__class__.from_ordered(gen())
def min(self):
if self:
return self.begin[0]
return None
def max(self):
if self:
return self.end[-1]
return None

View File

@ -587,11 +587,11 @@ class AgendaMonthView(AgendaDateView, MonthArchiveView):
# get desks opening hours on last period iteration
if period == max_date:
for hour in desk.get_opening_hours(current_date):
for opening_hour in desk.get_opening_hours(current_date):
timetable['infos']['opening_hours'].append(
{
'css_top': 100 * (hour.begin - current_date).seconds // 3600,
'css_height': 100 * (hour.end - hour.begin).seconds // 3600,
'css_top': 100 * (opening_hour.begin - current_date).seconds // 3600,
'css_height': 100 * (opening_hour.end - opening_hour.begin).seconds // 3600,
'css_width': width,
'css_left': left,
}

View File

@ -597,93 +597,111 @@ def test_virtual_agenda_base_meeting_duration():
assert virt_agenda.get_base_meeting_duration() == 60
def test_get_effective_timeperiods():
time_period = TimePeriod(weekday=0, start_time=datetime.time(10, 0), end_time=datetime.time(18, 0))
def test_agenda_get_effective_time_periods(db):
real_agenda = Agenda.objects.create(label='Real Agenda', kind='meetings')
meeting_type = MeetingType.objects.create(agenda=real_agenda, label='MT1')
desk = Desk.objects.create(label='Real Agenda Desk1', agenda=real_agenda)
time_period = TimePeriod.objects.create(
weekday=0, start_time=datetime.time(10, 0), end_time=datetime.time(18, 0), desk=desk
)
virtual_agenda = Agenda.objects.create(label='Virtual Agenda', kind='virtual')
VirtualMember.objects.create(virtual_agenda=virtual_agenda, real_agenda=real_agenda)
# empty exclusion set
effective_timeperiods = time_period.get_effective_timeperiods(TimePeriod.objects.none())
assert len(effective_timeperiods) == 1
effective_timeperiod = effective_timeperiods[0]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == time_period.start_time
assert effective_timeperiod.end_time == time_period.end_time
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
assert len(common_timeperiods) == 1
common_timeperiod = common_timeperiods[0]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == time_period.start_time
assert common_timeperiod.end_time == time_period.end_time
# exclusions are on a different day
excluded_timeperiods = [
TimePeriod(weekday=1, start_time=datetime.time(17, 0), end_time=datetime.time(18, 0)),
TimePeriod(weekday=2, start_time=datetime.time(17, 0), end_time=datetime.time(18, 0)),
]
effective_timeperiods = time_period.get_effective_timeperiods(excluded_timeperiods)
assert len(effective_timeperiods) == 1
effective_timeperiod = effective_timeperiods[0]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == time_period.start_time
assert effective_timeperiod.end_time == time_period.end_time
def exclude_time_periods(time_periods):
virtual_agenda.excluded_timeperiods.clear()
for time_period in time_periods:
time_period.agenda = virtual_agenda
time_period.save()
exclude_time_periods(
[
TimePeriod(weekday=1, start_time=datetime.time(17, 0), end_time=datetime.time(18, 0)),
TimePeriod(weekday=2, start_time=datetime.time(17, 0), end_time=datetime.time(18, 0)),
]
)
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
assert len(common_timeperiods) == 1
common_timeperiod = common_timeperiods[0]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == time_period.start_time
assert common_timeperiod.end_time == time_period.end_time
# one exclusion, end_time should be earlier
excluded_timeperiods = [
TimePeriod(weekday=0, start_time=datetime.time(17, 0), end_time=datetime.time(18, 0))
]
effective_timeperiods = time_period.get_effective_timeperiods(excluded_timeperiods)
assert len(effective_timeperiods) == 1
effective_timeperiod = effective_timeperiods[0]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == datetime.time(10, 0)
assert effective_timeperiod.end_time == datetime.time(17, 0)
exclude_time_periods(
[TimePeriod(weekday=0, start_time=datetime.time(17, 0), end_time=datetime.time(18, 0))]
)
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
assert len(common_timeperiods) == 1
common_timeperiod = common_timeperiods[0]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == datetime.time(10, 0)
assert common_timeperiod.end_time == datetime.time(17, 0)
# one exclusion, start_time should be later
excluded_timeperiods = [
TimePeriod(weekday=0, start_time=datetime.time(10, 0), end_time=datetime.time(16, 0))
]
effective_timeperiods = time_period.get_effective_timeperiods(excluded_timeperiods)
assert len(effective_timeperiods) == 1
effective_timeperiod = effective_timeperiods[0]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == datetime.time(16, 0)
assert effective_timeperiod.end_time == datetime.time(18, 0)
exclude_time_periods(
[TimePeriod(weekday=0, start_time=datetime.time(10, 0), end_time=datetime.time(16, 0))]
)
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
assert len(common_timeperiods) == 1
common_timeperiod = common_timeperiods[0]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == datetime.time(16, 0)
assert common_timeperiod.end_time == datetime.time(18, 0)
# one exclusion, splits effective timeperiod in two
excluded_timeperiods = [
TimePeriod(weekday=0, start_time=datetime.time(12, 0), end_time=datetime.time(16, 0))
]
effective_timeperiods = time_period.get_effective_timeperiods(excluded_timeperiods)
assert len(effective_timeperiods) == 2
effective_timeperiod = effective_timeperiods[0]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == datetime.time(10, 0)
assert effective_timeperiod.end_time == datetime.time(12, 0)
effective_timeperiod = effective_timeperiods[1]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == datetime.time(16, 0)
assert effective_timeperiod.end_time == datetime.time(18, 0)
exclude_time_periods(
[TimePeriod(weekday=0, start_time=datetime.time(12, 0), end_time=datetime.time(16, 0))]
)
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
assert len(common_timeperiods) == 2
common_timeperiod = common_timeperiods[0]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == datetime.time(10, 0)
assert common_timeperiod.end_time == datetime.time(12, 0)
common_timeperiod = common_timeperiods[1]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == datetime.time(16, 0)
assert common_timeperiod.end_time == datetime.time(18, 0)
# several exclusion, splits effective timeperiod into pieces
excluded_timeperiods = [
TimePeriod(weekday=0, start_time=datetime.time(12, 0), end_time=datetime.time(13, 0)),
TimePeriod(weekday=0, start_time=datetime.time(10, 30), end_time=datetime.time(11, 30)),
TimePeriod(weekday=0, start_time=datetime.time(16, 30), end_time=datetime.time(17, 00)),
]
effective_timeperiods = time_period.get_effective_timeperiods(excluded_timeperiods)
assert len(effective_timeperiods) == 4
exclude_time_periods(
[
TimePeriod(weekday=0, start_time=datetime.time(12, 0), end_time=datetime.time(13, 0)),
TimePeriod(weekday=0, start_time=datetime.time(10, 30), end_time=datetime.time(11, 30)),
TimePeriod(weekday=0, start_time=datetime.time(16, 30), end_time=datetime.time(17, 00)),
]
)
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
assert len(common_timeperiods) == 4
effective_timeperiod = effective_timeperiods[0]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == datetime.time(10, 0)
assert effective_timeperiod.end_time == datetime.time(10, 30)
common_timeperiod = common_timeperiods[0]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == datetime.time(10, 0)
assert common_timeperiod.end_time == datetime.time(10, 30)
effective_timeperiod = effective_timeperiods[1]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == datetime.time(11, 30)
assert effective_timeperiod.end_time == datetime.time(12, 0)
common_timeperiod = common_timeperiods[1]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == datetime.time(11, 30)
assert common_timeperiod.end_time == datetime.time(12, 0)
effective_timeperiod = effective_timeperiods[2]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == datetime.time(13, 0)
assert effective_timeperiod.end_time == datetime.time(16, 30)
common_timeperiod = common_timeperiods[2]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == datetime.time(13, 0)
assert common_timeperiod.end_time == datetime.time(16, 30)
effective_timeperiod = effective_timeperiods[3]
assert effective_timeperiod.weekday == time_period.weekday
assert effective_timeperiod.start_time == datetime.time(17, 0)
assert effective_timeperiod.end_time == datetime.time(18, 0)
common_timeperiod = common_timeperiods[3]
assert common_timeperiod.weekday == time_period.weekday
assert common_timeperiod.start_time == datetime.time(17, 0)
assert common_timeperiod.end_time == datetime.time(18, 0)
def test_desk_duplicate():

View File

@ -2995,7 +2995,7 @@ def test_virtual_agendas_meetings_datetimes_multiple_agendas(app, time_zone, moc
with CaptureQueriesContext(connection) as ctx:
resp = app.get(api_url)
assert len(resp.json['data']) == 12
assert len(ctx.captured_queries) == 16
assert len(ctx.captured_queries) == 11
# simulate booking
dt = datetime.datetime.strptime(resp.json['data'][2]['id'].split(':')[1], '%Y-%m-%d-%H%M')

View File

@ -1,93 +1,87 @@
import pytest
from chrono.interval import Interval, Intervals
from chrono.interval import Interval, IntervalSet
def test_interval_repr():
a = Interval(1, 4)
repr(a)
def test_interval_set_merge_adjacent():
'''
Test that adjacent intervals are merged
'''
s = IntervalSet()
for i in range(0, 100, 2):
s.add(i, i + 1)
for i in range(1, 100, 2):
s.add(Interval(i, i + 1))
s.add((11, 12))
assert list(s) == [(0, 100)]
def test_interval_overlap():
a = Interval(1, 4)
assert not a.overlap(0, 1)
assert a.overlap(0, 2)
assert a.overlap(1, 4)
assert a.overlap(2, 3)
assert a.overlap(3, 5)
assert not a.overlap(5, 6)
def test_interval_set_from_ordered():
assert IntervalSet.from_ordered([(0, 10), (1, 9), (2, 13), (13, 20)]) == [(0, 20)]
with pytest.raises(ValueError):
IntervalSet.from_ordered([(3, 4), (1, 2)])
def test_intervals():
intervals = Intervals()
assert len(list(intervals.search(0, 5))) == 0
for i in range(10):
intervals.add(i, i + 1, 1)
for i in range(10, 20):
intervals.add(i, i + 1, 2)
for i in range(5, 15):
intervals.add(i, i + 1, 3)
assert len(list(intervals.search(0, 5))) == 5
assert len(list(intervals.search(0, 10))) == 15
assert len(list(intervals.search(5, 15))) == 20
assert len(list(intervals.search(10, 20))) == 15
assert len(list(intervals.search(15, 20))) == 5
assert set(intervals.search_data(0, 5)) == {1}
assert set(intervals.search_data(0, 10)) == {1, 3}
assert set(intervals.search_data(5, 15)) == {1, 2, 3}
assert set(intervals.search_data(10, 20)) == {2, 3}
assert set(intervals.search_data(15, 20)) == {2}
for i in range(20):
assert intervals.overlap(i, i + 1)
intervals.remove_overlap(5, 15)
assert set(intervals.search_data(0, 20)) == {1, 2}
for i in range(5):
assert intervals.overlap(i, i + 1)
for i in range(5, 15):
assert not intervals.overlap(i, i + 1)
for i in range(15, 20):
assert intervals.overlap(i, i + 1)
def test_interval_set_overlaps():
'''
Test overlaps() works.
'''
s = IntervalSet()
assert not s.overlaps(0, 1)
for i in range(0, 100, 2):
s.add(i, i + 1)
i = -0.5
while i < 99:
assert s.overlaps(i, i + 1)
assert s.overlaps(i + 1, i + 2)
i += 2
def test_interval_remove():
intervals = Intervals()
intervals.remove(10, 11) # do nothing
intervals.add(9, 12)
intervals.add(14, 17)
intervals.remove(11, 24)
assert list(intervals.search(0, 24)) == [Interval(9, 11)]
intervals.remove(8, 10)
assert list(intervals.search(0, 24)) == [Interval(10, 11)]
intervals = Intervals()
intervals.add(9, 12)
intervals.add(14, 17)
intervals.remove(10, 11)
assert list(intervals.search(0, 24)) == [Interval(9, 10), Interval(11, 12), Interval(14, 17)]
def test_interval_set_add_and_from_ordered_equivalence():
s1 = IntervalSet()
for i in range(0, 100, 2):
s1.add(i, i + 1)
s2 = IntervalSet.from_ordered((i, i + 1) for i in range(0, 100, 2))
assert s1 == s2
def test_doc_test():
a = Interval(1, 10)
b = Interval(2, 4)
c = Interval(3, 5)
d = Interval(8, 9)
def test_interval_set_from_ordered_merge_adjacent():
s = IntervalSet.from_ordered((i, i + 1) for i in range(0, 100))
assert list(s) == [(0, 100)]
s = Intervals()
for x in [a, b, c, d]:
s.add_interval(x)
assert sorted(s.search(2, 9), key=lambda x: x.begin) == [a, b, c, d]
assert sorted(s.search(-1, 11), key=lambda x: x.begin) == [a, b, c, d]
assert sorted(s.search(1, 3), key=lambda x: x.begin) == [a, b]
assert sorted(s.search(4, 10), key=lambda x: x.begin) == [a, c, d]
assert sorted(s.search(5, 10), key=lambda x: x.begin) == [a, d]
def test_invterval_cast():
s = IntervalSet([(1, 2)])
assert IntervalSet.cast(s) == [(1, 2)]
assert IntervalSet.cast(((1, 2),)) == [(1, 2)]
assert IntervalSet.cast([[1, 2]]) == [(1, 2)]
assert IntervalSet.cast((Interval(1, 2),)) == [(1, 2)]
def test_interval_set_sub():
s = IntervalSet([(0, 3), (4, 7), (8, 11), (12, 15)])
assert (s - [(-2, -1), (1, 2), (3, 5), (10, 11.5), (11.5, 15.5)]) == [(0, 1), (2, 3), (5, 7), (8, 10)]
assert (s - [(-2, -1), (1, 2), (3, 5), (10, 11.5), (11.5, 15.5)]) == [(0, 1), (2, 3), (5, 7), (8, 10)]
assert (s - []) == s
assert (IntervalSet([(0, 2)]) - [(1, 2)]) == [(0, 1)]
def test_interval_set_min_max():
assert IntervalSet().min() is None
assert IntervalSet().max() is None
assert IntervalSet.simple(1, 2).min() == 1
assert IntervalSet.simple(1, 2).max() == 2
def test_interval_set_repr():
assert repr(IntervalSet([(1, 2)])) == repr([(1, 2)])
def test_interval_set_eq():
assert IntervalSet([(1, 2)]) == [(1, 2)]
assert [(1, 2)] == IntervalSet([(1, 2)])
assert not IntervalSet([(1, 2)]) == None
assert not None == IntervalSet([(1, 2)])

View File

@ -10,5 +10,6 @@ def metz_data(db):
def test_get_all_slots(db, app, metz_data, freezer):
freezer.move_to('2020-04-29 22:04:00')
response = app.get('/api/agenda/vdmvirtuel-retraitmasque/meetings/un-retrait/datetimes/')
assert len(response.json['data']) == 324

View File

@ -21,79 +21,79 @@ def test_timeperiod_time_slots():
timeperiod = TimePeriod(
desk=desk, weekday=0, start_time=datetime.time(9, 0), end_time=datetime.time(12, 0)
)
events = timeperiod.get_time_slots(
events = timeperiod.as_shared_timeperiods().get_time_slots(
min_datetime=make_aware(datetime.datetime(2016, 9, 1)),
max_datetime=make_aware(datetime.datetime(2016, 10, 1)),
meeting_type=meeting_type,
meeting_duration=meeting_type.duration,
base_duration=agenda.get_base_meeting_duration(),
)
events = list(sorted(events, key=lambda x: x.start_datetime))
assert events[0].start_datetime.timetuple()[:5] == (2016, 9, 5, 9, 0)
assert events[1].start_datetime.timetuple()[:5] == (2016, 9, 5, 10, 0)
assert events[2].start_datetime.timetuple()[:5] == (2016, 9, 5, 11, 0)
assert events[3].start_datetime.timetuple()[:5] == (2016, 9, 12, 9, 0)
assert events[4].start_datetime.timetuple()[:5] == (2016, 9, 12, 10, 0)
assert events[-1].start_datetime.timetuple()[:5] == (2016, 9, 26, 11, 0)
events = sorted(events)
assert events[0].timetuple()[:5] == (2016, 9, 5, 9, 0)
assert events[1].timetuple()[:5] == (2016, 9, 5, 10, 0)
assert events[2].timetuple()[:5] == (2016, 9, 5, 11, 0)
assert events[3].timetuple()[:5] == (2016, 9, 12, 9, 0)
assert events[4].timetuple()[:5] == (2016, 9, 12, 10, 0)
assert events[-1].timetuple()[:5] == (2016, 9, 26, 11, 0)
assert len(events) == 12
# another start before the timeperiod
timeperiod = TimePeriod(
desk=desk, weekday=1, start_time=datetime.time(9, 0), end_time=datetime.time(12, 0)
)
events = timeperiod.get_time_slots(
events = timeperiod.as_shared_timeperiods().get_time_slots(
min_datetime=make_aware(datetime.datetime(2016, 9, 1)),
max_datetime=make_aware(datetime.datetime(2016, 10, 1)),
meeting_type=meeting_type,
meeting_duration=meeting_type.duration,
base_duration=agenda.get_base_meeting_duration(),
)
events = list(sorted(events, key=lambda x: x.start_datetime))
assert events[0].start_datetime.timetuple()[:5] == (2016, 9, 6, 9, 0)
assert events[-1].start_datetime.timetuple()[:5] == (2016, 9, 27, 11, 0)
events = sorted(events)
assert events[0].timetuple()[:5] == (2016, 9, 6, 9, 0)
assert events[-1].timetuple()[:5] == (2016, 9, 27, 11, 0)
assert len(events) == 12
# a start on the day of the timeperiod
timeperiod = TimePeriod(
desk=desk, weekday=3, start_time=datetime.time(9, 0), end_time=datetime.time(12, 0)
)
events = timeperiod.get_time_slots(
events = timeperiod.as_shared_timeperiods().get_time_slots(
min_datetime=make_aware(datetime.datetime(2016, 9, 1)),
max_datetime=make_aware(datetime.datetime(2016, 10, 1)),
meeting_type=meeting_type,
meeting_duration=meeting_type.duration,
base_duration=agenda.get_base_meeting_duration(),
)
events = list(sorted(events, key=lambda x: x.start_datetime))
assert events[0].start_datetime.timetuple()[:5] == (2016, 9, 1, 9, 0)
assert events[-1].start_datetime.timetuple()[:5] == (2016, 9, 29, 11, 0)
events = sorted(events)
assert events[0].timetuple()[:5] == (2016, 9, 1, 9, 0)
assert events[-1].timetuple()[:5] == (2016, 9, 29, 11, 0)
assert len(events) == 15
# a start after the day of the timeperiod
timeperiod = TimePeriod(
desk=desk, weekday=4, start_time=datetime.time(9, 0), end_time=datetime.time(12, 0)
)
events = timeperiod.get_time_slots(
events = timeperiod.as_shared_timeperiods().get_time_slots(
min_datetime=make_aware(datetime.datetime(2016, 9, 1)),
max_datetime=make_aware(datetime.datetime(2016, 10, 1)),
meeting_type=meeting_type,
meeting_duration=meeting_type.duration,
base_duration=agenda.get_base_meeting_duration(),
)
events = list(sorted(events, key=lambda x: x.start_datetime))
assert events[0].start_datetime.timetuple()[:5] == (2016, 9, 2, 9, 0)
assert events[-1].start_datetime.timetuple()[:5] == (2016, 9, 30, 11, 0)
events = sorted(events)
assert events[0].timetuple()[:5] == (2016, 9, 2, 9, 0)
assert events[-1].timetuple()[:5] == (2016, 9, 30, 11, 0)
assert len(events) == 15
# another start after the day of the timeperiod
timeperiod = TimePeriod(
desk=desk, weekday=5, start_time=datetime.time(9, 0), end_time=datetime.time(12, 0)
)
events = timeperiod.get_time_slots(
events = timeperiod.as_shared_timeperiods().get_time_slots(
min_datetime=make_aware(datetime.datetime(2016, 9, 1)),
max_datetime=make_aware(datetime.datetime(2016, 10, 1)),
meeting_type=meeting_type,
meeting_duration=meeting_type.duration,
base_duration=agenda.get_base_meeting_duration(),
)
events = list(sorted(events, key=lambda x: x.start_datetime))
assert events[0].start_datetime.timetuple()[:5] == (2016, 9, 3, 9, 0)
assert events[-1].start_datetime.timetuple()[:5] == (2016, 9, 24, 11, 0)
events = sorted(events)
assert events[0].timetuple()[:5] == (2016, 9, 3, 9, 0)
assert events[-1].timetuple()[:5] == (2016, 9, 24, 11, 0)
assert len(events) == 12
# shorter duration -> double the events
@ -102,15 +102,15 @@ def test_timeperiod_time_slots():
timeperiod = TimePeriod(
desk=desk, weekday=5, start_time=datetime.time(9, 0), end_time=datetime.time(12, 0)
)
events = timeperiod.get_time_slots(
events = timeperiod.as_shared_timeperiods().get_time_slots(
min_datetime=make_aware(datetime.datetime(2016, 9, 1)),
max_datetime=make_aware(datetime.datetime(2016, 10, 1)),
meeting_type=meeting_type,
meeting_duration=meeting_type.duration,
base_duration=agenda.get_base_meeting_duration(),
)
events = list(sorted(events, key=lambda x: x.start_datetime))
assert events[0].start_datetime.timetuple()[:5] == (2016, 9, 3, 9, 0)
assert events[-1].start_datetime.timetuple()[:5] == (2016, 9, 24, 11, 30)
events = sorted(events)
assert events[0].timetuple()[:5] == (2016, 9, 3, 9, 0)
assert events[-1].timetuple()[:5] == (2016, 9, 24, 11, 30)
assert len(events) == 24
@ -167,19 +167,19 @@ def test_desk_opening_hours():
desk = Desk.objects.create(label='Desk 1', agenda=agenda)
# nothing yet
hours = list(desk.get_opening_hours(datetime.date(2018, 1, 22)))
hours = desk.get_opening_hours(datetime.date(2018, 1, 22))
assert len(hours) == 0
# morning
TimePeriod(desk=desk, weekday=0, start_time=datetime.time(9, 0), end_time=datetime.time(12, 0)).save()
hours = list(desk.get_opening_hours(datetime.date(2018, 1, 22)))
hours = desk.get_opening_hours(datetime.date(2018, 1, 22))
assert len(hours) == 1
assert hours[0].begin.time() == datetime.time(9, 0)
assert hours[0].end.time() == datetime.time(12, 0)
# and afternoon
TimePeriod(desk=desk, weekday=0, start_time=datetime.time(14, 0), end_time=datetime.time(17, 0)).save()
hours = list(desk.get_opening_hours(datetime.date(2018, 1, 22)))
hours = desk.get_opening_hours(datetime.date(2018, 1, 22))
assert len(hours) == 2
assert hours[0].begin.time() == datetime.time(9, 0)
assert hours[0].end.time() == datetime.time(12, 0)
@ -195,13 +195,13 @@ def test_desk_opening_hours():
)
exception.save()
hours = list(desk.get_opening_hours(datetime.date(2018, 1, 22)))
hours = desk.get_opening_hours(datetime.date(2018, 1, 22))
assert len(hours) == 0
# closed from 11am
exception.start_datetime = make_aware(datetime.datetime(2018, 1, 22, 11, 0))
exception.save()
hours = list(desk.get_opening_hours(datetime.date(2018, 1, 22)))
hours = desk.get_opening_hours(datetime.date(2018, 1, 22))
assert len(hours) == 1
assert localtime(hours[0].begin).time() == datetime.time(9, 0)
assert localtime(hours[0].end).time() == datetime.time(11, 0)
@ -210,7 +210,7 @@ def test_desk_opening_hours():
exception.start_datetime = make_aware(datetime.datetime(2018, 1, 22, 10, 0))
exception.end_datetime = make_aware(datetime.datetime(2018, 1, 22, 11, 0))
exception.save()
hours = list(desk.get_opening_hours(datetime.date(2018, 1, 22)))
hours = desk.get_opening_hours(datetime.date(2018, 1, 22))
assert len(hours) == 3
assert localtime(hours[0].begin).time() == datetime.time(9, 0)
assert localtime(hours[0].end).time() == datetime.time(10, 0)
@ -232,15 +232,15 @@ def test_timeperiod_midnight_overlap_time_slots():
timeperiod = TimePeriod(
desk=desk, weekday=0, start_time=datetime.time(21, 0), end_time=datetime.time(23, 0)
)
events = timeperiod.get_time_slots(
events = timeperiod.as_shared_timeperiods().get_time_slots(
min_datetime=make_aware(datetime.datetime(2016, 9, 1)),
max_datetime=make_aware(datetime.datetime(2016, 10, 1)),
meeting_type=meeting_type,
meeting_duration=meeting_type.duration,
base_duration=agenda.get_base_meeting_duration(),
)
events = list(sorted(events, key=lambda x: x.start_datetime))
assert events[0].start_datetime.timetuple()[:5] == (2016, 9, 5, 21, 0)
assert events[1].start_datetime.timetuple()[:5] == (2016, 9, 12, 21, 0)
assert events[2].start_datetime.timetuple()[:5] == (2016, 9, 19, 21, 0)
assert events[3].start_datetime.timetuple()[:5] == (2016, 9, 26, 21, 0)
events = sorted(events)
assert events[0].timetuple()[:5] == (2016, 9, 5, 21, 0)
assert events[1].timetuple()[:5] == (2016, 9, 12, 21, 0)
assert events[2].timetuple()[:5] == (2016, 9, 19, 21, 0)
assert events[3].timetuple()[:5] == (2016, 9, 26, 21, 0)
assert len(events) == 4