1366 lines
52 KiB
Python
1366 lines
52 KiB
Python
# -*- coding: utf-8 -*-
|
|
# chrono - agendas system
|
|
# Copyright (C) 2016 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import copy
|
|
import collections
|
|
import datetime
|
|
import functools
|
|
import itertools
|
|
import math
|
|
import uuid
|
|
|
|
import requests
|
|
import vobject
|
|
|
|
import django
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import Group
|
|
from django.core.exceptions import FieldDoesNotExist
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.validators import MaxValueValidator
|
|
from django.db import models, transaction
|
|
from django.db.models import Count, Q, Case, When
|
|
from django.urls import reverse
|
|
from django.utils import functional
|
|
from django.utils.dates import WEEKDAYS
|
|
from django.utils.encoding import force_text
|
|
from django.utils.formats import date_format
|
|
from django.utils.text import slugify
|
|
from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
from jsonfield import JSONField
|
|
|
|
from chrono.interval import Interval, IntervalSet
|
|
|
|
|
|
AGENDA_KINDS = (
|
|
('events', _('Events')),
|
|
('meetings', _('Meetings')),
|
|
('virtual', _('Virtual')),
|
|
)
|
|
|
|
|
|
def is_midnight(dtime):
|
|
dtime = localtime(dtime)
|
|
return dtime.hour == 0 and dtime.minute == 0
|
|
|
|
|
|
def generate_slug(instance, **query_filters):
|
|
base_slug = instance.base_slug
|
|
slug = base_slug
|
|
i = 1
|
|
while instance._meta.model.objects.filter(slug=slug, **query_filters).exists():
|
|
slug = '%s-%s' % (base_slug, i)
|
|
i += 1
|
|
return slug
|
|
|
|
|
|
def clean_import_data(cls, data):
|
|
cleaned_data = copy.deepcopy(data)
|
|
for param in data:
|
|
try:
|
|
cls._meta.get_field(param)
|
|
except FieldDoesNotExist:
|
|
# remove unknown fields
|
|
cleaned_data.pop(param)
|
|
return cleaned_data
|
|
|
|
|
|
def validate_not_digit(value):
|
|
if value.isdigit():
|
|
raise ValidationError(_('This value cannot be a number.'))
|
|
|
|
|
|
class ICSError(Exception):
|
|
pass
|
|
|
|
|
|
class AgendaImportError(Exception):
|
|
pass
|
|
|
|
|
|
class Agenda(models.Model):
|
|
label = models.CharField(_('Label'), max_length=150)
|
|
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
|
|
kind = models.CharField(_('Kind'), max_length=20, choices=AGENDA_KINDS, default='events')
|
|
minimal_booking_delay = models.PositiveIntegerField(
|
|
_('Minimal booking delay (in days)'),
|
|
default=None,
|
|
null=True,
|
|
blank=True,
|
|
validators=[MaxValueValidator(10000)],
|
|
)
|
|
maximal_booking_delay = models.PositiveIntegerField(
|
|
_('Maximal booking delay (in days)'),
|
|
default=None,
|
|
null=True,
|
|
blank=True,
|
|
validators=[MaxValueValidator(10000)],
|
|
) # eight weeks
|
|
real_agendas = models.ManyToManyField(
|
|
'self',
|
|
related_name='virtual_agendas',
|
|
symmetrical=False,
|
|
through='VirtualMember',
|
|
through_fields=('virtual_agenda', 'real_agenda'),
|
|
)
|
|
edit_role = models.ForeignKey(
|
|
Group,
|
|
blank=True,
|
|
null=True,
|
|
default=None,
|
|
related_name='+',
|
|
verbose_name=_('Edit Role'),
|
|
on_delete=models.SET_NULL,
|
|
)
|
|
view_role = models.ForeignKey(
|
|
Group,
|
|
blank=True,
|
|
null=True,
|
|
default=None,
|
|
related_name='+',
|
|
verbose_name=_('View Role'),
|
|
on_delete=models.SET_NULL,
|
|
)
|
|
resources = models.ManyToManyField('Resource')
|
|
|
|
class Meta:
|
|
ordering = ['label']
|
|
|
|
def __str__(self):
|
|
return self.label
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self.slug:
|
|
self.slug = generate_slug(self)
|
|
if self.kind != 'virtual':
|
|
if self.minimal_booking_delay is None:
|
|
self.minimal_booking_delay = 1
|
|
if self.maximal_booking_delay is None:
|
|
self.maximal_booking_delay = 8 * 7
|
|
super(Agenda, self).save(*args, **kwargs)
|
|
|
|
@property
|
|
def base_slug(self):
|
|
return slugify(self.label)
|
|
|
|
def get_absolute_url(self):
|
|
return reverse('chrono-manager-agenda-view', kwargs={'pk': self.id})
|
|
|
|
def can_be_managed(self, user):
|
|
if user.is_staff:
|
|
return True
|
|
group_ids = [x.id for x in user.groups.all()]
|
|
return bool(self.edit_role_id in group_ids)
|
|
|
|
def can_be_viewed(self, user):
|
|
if self.can_be_managed(user):
|
|
return True
|
|
group_ids = [x.id for x in user.groups.all()]
|
|
return bool(self.view_role_id in group_ids)
|
|
|
|
def accept_meetings(self):
|
|
if self.kind == 'virtual':
|
|
return not self.real_agendas.filter(~Q(kind='meetings')).exists()
|
|
return self.kind == 'meetings'
|
|
|
|
def get_real_agendas(self):
|
|
if self.kind == 'virtual':
|
|
return self.real_agendas.all()
|
|
return [self]
|
|
|
|
def iter_meetingtypes(self, excluded_agenda=None):
|
|
""" Expose agenda's meetingtypes.
|
|
straighforward on a real agenda
|
|
On a virtual agenda we expose transient meeting types based on on the
|
|
the real ones shared by every real agendas.
|
|
"""
|
|
if self.kind == 'virtual':
|
|
base_qs = MeetingType.objects.filter(agenda__virtual_agendas__in=[self], deleted=False)
|
|
real_agendas = self.real_agendas
|
|
if excluded_agenda:
|
|
base_qs = base_qs.exclude(agenda=excluded_agenda)
|
|
real_agendas = real_agendas.exclude(pk=excluded_agenda.pk)
|
|
queryset = (
|
|
base_qs.values('slug', 'duration', 'label')
|
|
.annotate(total=Count('*'))
|
|
.filter(total=real_agendas.count())
|
|
)
|
|
return [
|
|
MeetingType(duration=mt['duration'], label=mt['label'], slug=mt['slug'])
|
|
for mt in queryset.order_by('slug')
|
|
]
|
|
|
|
return self.meetingtype_set.filter(deleted=False).all().order_by('slug')
|
|
|
|
def get_meetingtype(self, id_=None, slug=None):
|
|
match = id_ or slug
|
|
assert match, 'an identifier or a slug should be specified'
|
|
|
|
if self.kind == 'virtual':
|
|
match = id_ or slug
|
|
meeting_type = None
|
|
for mt in self.iter_meetingtypes():
|
|
if mt.slug == match:
|
|
meeting_type = mt
|
|
break
|
|
if meeting_type is None:
|
|
raise MeetingType.DoesNotExist()
|
|
return meeting_type
|
|
|
|
if id_:
|
|
return MeetingType.objects.get(id=id_, agenda=self, deleted=False)
|
|
return MeetingType.objects.get(slug=slug, agenda=self, deleted=False)
|
|
|
|
def get_virtual_members(self):
|
|
return VirtualMember.objects.filter(virtual_agenda=self)
|
|
|
|
def get_base_meeting_duration(self):
|
|
durations = [x.duration for x in self.iter_meetingtypes()]
|
|
if not durations:
|
|
raise ValueError()
|
|
gcd = durations[0]
|
|
for duration in durations[1:]:
|
|
gcd = math.gcd(duration, gcd)
|
|
return gcd
|
|
|
|
def export_json(self):
|
|
agenda = {
|
|
'label': self.label,
|
|
'slug': self.slug,
|
|
'kind': self.kind,
|
|
'minimal_booking_delay': self.minimal_booking_delay,
|
|
'maximal_booking_delay': self.maximal_booking_delay,
|
|
'permissions': {
|
|
'view': self.view_role.name if self.view_role else None,
|
|
'edit': self.edit_role.name if self.edit_role else None,
|
|
},
|
|
'resources': [x.slug for x in self.resources.all()],
|
|
}
|
|
if self.kind == 'events':
|
|
agenda['events'] = [x.export_json() for x in self.event_set.all()]
|
|
elif self.kind == 'meetings':
|
|
agenda['meetingtypes'] = [x.export_json() for x in self.meetingtype_set.all()]
|
|
agenda['desks'] = [desk.export_json() for desk in self.desk_set.all()]
|
|
elif self.kind == 'virtual':
|
|
agenda['excluded_timeperiods'] = [x.export_json() for x in self.excluded_timeperiods.all()]
|
|
agenda['real_agendas'] = [{'slug': x.slug, 'kind': x.kind} for x in self.real_agendas.all()]
|
|
return agenda
|
|
|
|
@classmethod
|
|
def import_json(cls, data, overwrite=False):
|
|
data = data.copy()
|
|
permissions = data.pop('permissions') or {}
|
|
if data['kind'] == 'events':
|
|
events = data.pop('events')
|
|
elif data['kind'] == 'meetings':
|
|
meetingtypes = data.pop('meetingtypes')
|
|
desks = data.pop('desks')
|
|
elif data['kind'] == 'virtual':
|
|
excluded_timeperiods = data.pop('excluded_timeperiods')
|
|
real_agendas = data.pop('real_agendas')
|
|
for permission in ('view', 'edit'):
|
|
if permissions.get(permission):
|
|
try:
|
|
data[permission + '_role'] = Group.objects.get(name=permissions[permission])
|
|
except Group.DoesNotExist:
|
|
raise AgendaImportError(_('Missing "%s" role') % permissions[permission])
|
|
resources_slug = data.pop('resources', [])
|
|
resources_by_slug = {r.slug: r for r in Resource.objects.filter(slug__in=resources_slug)}
|
|
for resource_slug in resources_slug:
|
|
if resource_slug not in resources_by_slug:
|
|
raise AgendaImportError(_('Missing "%s" resource') % resource_slug)
|
|
data = clean_import_data(cls, data)
|
|
agenda, created = cls.objects.get_or_create(slug=data['slug'], defaults=data)
|
|
if not created:
|
|
for k, v in data.items():
|
|
setattr(agenda, k, v)
|
|
if data['kind'] == 'events':
|
|
if overwrite:
|
|
Event.objects.filter(agenda=agenda).delete()
|
|
for event_data in events:
|
|
event_data['agenda'] = agenda
|
|
Event.import_json(event_data).save()
|
|
elif data['kind'] == 'meetings':
|
|
if overwrite:
|
|
MeetingType.objects.filter(agenda=agenda).delete()
|
|
Desk.objects.filter(agenda=agenda).delete()
|
|
for type_data in meetingtypes:
|
|
type_data['agenda'] = agenda
|
|
MeetingType.import_json(type_data).save()
|
|
for desk in desks:
|
|
desk['agenda'] = agenda
|
|
Desk.import_json(desk).save()
|
|
agenda.resources.set(resources_by_slug.values())
|
|
elif data['kind'] == 'virtual':
|
|
if overwrite:
|
|
TimePeriod.objects.filter(agenda=agenda).delete()
|
|
VirtualMember.objects.filter(virtual_agenda=agenda).delete()
|
|
for excluded_timeperiod in excluded_timeperiods:
|
|
excluded_timeperiod['agenda'] = agenda
|
|
TimePeriod.import_json(excluded_timeperiod).save()
|
|
for real_agenda in real_agendas:
|
|
real_agenda = Agenda.objects.get(slug=real_agenda['slug'], kind=real_agenda['kind'])
|
|
try:
|
|
vm, created = VirtualMember.objects.get_or_create(
|
|
virtual_agenda=agenda, real_agenda=real_agenda
|
|
)
|
|
vm.clean()
|
|
except ValidationError as exc:
|
|
raise AgendaImportError(' '.join(exc.messages))
|
|
|
|
return created
|
|
|
|
def duplicate(self, label=None):
|
|
# clone current agenda
|
|
new_agenda = copy.deepcopy(self)
|
|
new_agenda.pk = None
|
|
new_agenda.label = label or _('Copy of %s') % self.label
|
|
# reset slug
|
|
new_agenda.slug = None
|
|
new_agenda.save()
|
|
|
|
# clone related objects
|
|
if self.kind == 'meetings':
|
|
for meeting_type in self.meetingtype_set.all():
|
|
meeting_type.duplicate(agenda_target=new_agenda)
|
|
for desk in self.desk_set.all():
|
|
desk.duplicate(agenda_target=new_agenda)
|
|
new_agenda.resources.set(self.resources.all())
|
|
elif self.kind == 'events':
|
|
for event in self.event_set.all():
|
|
event.duplicate(agenda_target=new_agenda)
|
|
elif self.kind == 'virtual':
|
|
for timeperiod in self.excluded_timeperiods.all():
|
|
timeperiod.duplicate(agenda_target=new_agenda)
|
|
for real_agenda in self.real_agendas.all():
|
|
VirtualMember.objects.create(virtual_agenda=new_agenda, real_agenda=real_agenda)
|
|
return new_agenda
|
|
|
|
def get_effective_time_periods(self):
|
|
'''Regroup timeperiods by desks.
|
|
|
|
List all timeperiods, timeperiods having the same begin_time and
|
|
end_time are regrouped in a SharedTimePeriod object, which has a
|
|
list of desks instead of only one desk.
|
|
'''
|
|
if self.kind == 'virtual':
|
|
return self.get_effective_time_periods_virtual()
|
|
elif self.kind == 'meetings':
|
|
return self.get_effective_time_periods_meetings()
|
|
else:
|
|
raise ValueError('does not work with kind %r' % self.kind)
|
|
|
|
def get_effective_time_periods_meetings(self):
|
|
'''List timeperiod instances for all desks of the agenda, convert them
|
|
into an Interval of WeekTime which can be compared and regrouped using
|
|
itertools.groupby.
|
|
'''
|
|
yield from (
|
|
SharedTimePeriod.from_weektime_interval(
|
|
weektime_interval, desks=[time_period.desk for time_period in time_periods],
|
|
)
|
|
for weektime_interval, time_periods in itertools.groupby(
|
|
TimePeriod.objects.filter(desk__agenda=self)
|
|
.prefetch_related('desk')
|
|
.order_by('weekday', 'start_time', 'end_time'),
|
|
key=TimePeriod.as_weektime_interval,
|
|
)
|
|
)
|
|
|
|
def get_effective_time_periods_virtual(self):
|
|
'''List timeperiod instances for all desks of all real agendas of this
|
|
virtual agenda, convert them into an Interval of WeekTime which can be
|
|
compared and regrouped using itertools.groupby.
|
|
'''
|
|
closed_hours_by_days = IntervalSet.from_ordered(
|
|
[
|
|
time_period.as_weektime_interval()
|
|
for time_period in self.excluded_timeperiods.order_by('weekday', 'start_time', 'end_time')
|
|
]
|
|
)
|
|
for time_period_interval, time_periods in itertools.groupby(
|
|
TimePeriod.objects.filter(desk__agenda__virtual_agendas=self)
|
|
.order_by('weekday', 'start_time', 'end_time')
|
|
.prefetch_related('desk'),
|
|
key=lambda tp: tp.as_weektime_interval(),
|
|
):
|
|
time_periods = list(time_periods)
|
|
desks = [time_period.desk for time_period in time_periods]
|
|
if not closed_hours_by_days:
|
|
yield SharedTimePeriod.from_weektime_interval(time_period_interval, desks=desks)
|
|
else:
|
|
for weektime_interval in IntervalSet.simple(*time_period_interval) - closed_hours_by_days:
|
|
yield SharedTimePeriod.from_weektime_interval(weektime_interval, desks=desks)
|
|
|
|
def get_opened_events(self):
|
|
assert self.kind == 'events'
|
|
|
|
entries = self.event_set.all()
|
|
# we never want to allow booking for past events.
|
|
entries = entries.filter(start_datetime__gte=localtime(now()))
|
|
# exclude non published events
|
|
entries = entries.filter(
|
|
Q(publication_date__isnull=True) | Q(publication_date__lte=localtime(now()).date())
|
|
)
|
|
if self.minimal_booking_delay:
|
|
entries = entries.filter(
|
|
start_datetime__gte=localtime(
|
|
now() + datetime.timedelta(days=self.minimal_booking_delay)
|
|
).replace(hour=0, minute=0)
|
|
)
|
|
if self.maximal_booking_delay:
|
|
entries = entries.filter(
|
|
start_datetime__lt=localtime(
|
|
now() + datetime.timedelta(days=self.maximal_booking_delay)
|
|
).replace(hour=0, minute=0)
|
|
)
|
|
return entries
|
|
|
|
|
|
class VirtualMember(models.Model):
|
|
'''Trough model to link virtual agendas to their real agendas.
|
|
|
|
Real agendas linked to a virtual agenda MUST all have the same list of
|
|
MeetingType based on their label, slug and duration. It's enforced by
|
|
VirtualMember.clean() and the realted management views.
|
|
'''
|
|
|
|
virtual_agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE, related_name='real_members')
|
|
real_agenda = models.ForeignKey(
|
|
Agenda, on_delete=models.CASCADE, related_name='virtual_members', verbose_name='Agenda'
|
|
)
|
|
|
|
class Meta:
|
|
unique_together = (('virtual_agenda', 'real_agenda'),)
|
|
|
|
def clean(self):
|
|
error_msg = [_('This agenda does not have the same meeting types provided by the virtual agenda.')]
|
|
error = False
|
|
virtual_meetingtypes = self.virtual_agenda.iter_meetingtypes(excluded_agenda=self.real_agenda)
|
|
for meetingtype in virtual_meetingtypes:
|
|
try:
|
|
MeetingType.objects.get(
|
|
agenda=self.real_agenda,
|
|
label=meetingtype.label,
|
|
slug=meetingtype.slug,
|
|
duration=meetingtype.duration,
|
|
)
|
|
except MeetingType.DoesNotExist:
|
|
error = True
|
|
error_msg += [
|
|
_(
|
|
'Meeting type "%(label)s" (%(duration)s minutes) '
|
|
'(identifier: %(slug)s) does no exist.'
|
|
)
|
|
% {'label': meetingtype.label, 'duration': meetingtype.duration, 'slug': meetingtype.slug}
|
|
]
|
|
if error:
|
|
raise ValidationError(error_msg)
|
|
|
|
num_virt_meetingtypes = len(virtual_meetingtypes)
|
|
if (
|
|
num_virt_meetingtypes
|
|
and num_virt_meetingtypes != MeetingType.objects.filter(agenda=self.real_agenda).count()
|
|
):
|
|
extra_qs = MeetingType.objects.filter(agenda=self.real_agenda)
|
|
for virt_meetingtype in virtual_meetingtypes:
|
|
extra_qs = extra_qs.exclude(
|
|
slug=virt_meetingtype.slug,
|
|
label=virt_meetingtype.label,
|
|
duration=virt_meetingtype.duration,
|
|
)
|
|
for extra_meeting_type in extra_qs:
|
|
error = True
|
|
error_msg += ['Extra meeting type, "%s".' % extra_meeting_type.label]
|
|
|
|
if error:
|
|
raise ValidationError(error_msg)
|
|
|
|
|
|
WEEKDAYS_LIST = sorted(WEEKDAYS.items(), key=lambda x: x[0])
|
|
|
|
|
|
class WeekTime(collections.namedtuple('WeekTime', ['weekday', 'time'])):
|
|
'''Representation of a time point in a weekday, ex.: Monday at 5 o'clock.
|
|
'''
|
|
|
|
def __repr__(self):
|
|
return '%s / %s' % (force_text(WEEKDAYS[self.weekday]), date_format(self.time, 'TIME_FORMAT'),)
|
|
|
|
|
|
class TimePeriod(models.Model):
|
|
weekday = models.IntegerField(_('Week day'), choices=WEEKDAYS_LIST)
|
|
start_time = models.TimeField(_('Start'))
|
|
end_time = models.TimeField(_('End'))
|
|
desk = models.ForeignKey('Desk', on_delete=models.CASCADE, null=True)
|
|
agenda = models.ForeignKey(
|
|
Agenda, on_delete=models.CASCADE, null=True, related_name='excluded_timeperiods'
|
|
)
|
|
|
|
class Meta:
|
|
ordering = ['weekday', 'start_time']
|
|
|
|
def __str__(self):
|
|
return u'%s / %s → %s' % (
|
|
force_text(WEEKDAYS[self.weekday]),
|
|
date_format(self.start_time, 'TIME_FORMAT'),
|
|
date_format(self.end_time, 'TIME_FORMAT'),
|
|
)
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.agenda:
|
|
assert self.agenda.kind == 'virtual', "a time period can only reference a virtual agenda"
|
|
super(TimePeriod, self).save(*args, **kwargs)
|
|
|
|
@property
|
|
def weekday_str(self):
|
|
return WEEKDAYS[self.weekday]
|
|
|
|
@classmethod
|
|
def import_json(cls, data):
|
|
data = clean_import_data(cls, data)
|
|
return cls(**data)
|
|
|
|
def export_json(self):
|
|
return {
|
|
'weekday': self.weekday,
|
|
'start_time': self.start_time.strftime('%H:%M'),
|
|
'end_time': self.end_time.strftime('%H:%M'),
|
|
}
|
|
|
|
def duplicate(self, desk_target=None, agenda_target=None):
|
|
# clone current period
|
|
new_period = copy.deepcopy(self)
|
|
new_period.pk = None
|
|
# set desk
|
|
new_period.desk = desk_target or self.desk
|
|
# set agenda
|
|
new_period.agenda = agenda_target or self.agenda
|
|
# store new period
|
|
new_period.save()
|
|
|
|
return new_period
|
|
|
|
def as_weektime_interval(self):
|
|
return Interval(WeekTime(self.weekday, self.start_time), WeekTime(self.weekday, self.end_time),)
|
|
|
|
def as_shared_timeperiods(self):
|
|
return SharedTimePeriod(
|
|
weekday=self.weekday, start_time=self.start_time, end_time=self.end_time, desks=[self.desk],
|
|
)
|
|
|
|
|
|
@functools.total_ordering
|
|
class SharedTimePeriod(object):
|
|
'''
|
|
Hold common timeperiod for multiple desks.
|
|
|
|
To improve performance when generating meetings slots for virtual
|
|
agendas or agendas with many desks, we deduplicate time-periods between
|
|
all desks of all agendas.
|
|
|
|
Deduplication is based on a common key, and implemented through __eq__
|
|
and __lt__ which will be used by itertools.groupby().
|
|
|
|
(weekday, start_datetime, end_datetime)
|
|
|
|
it's done in the deduplicate() classmethod.
|
|
|
|
At the level of gel_all_slots() timeperiod are re-duplicated if the
|
|
min_datetime,max_datetime of the desk's agendas differs (see the code
|
|
of get_all_slots() for details).
|
|
'''
|
|
|
|
__slots__ = ['weekday', 'start_time', 'end_time', 'desks']
|
|
|
|
def __init__(self, weekday, start_time, end_time, desks):
|
|
self.weekday = weekday
|
|
self.start_time = start_time
|
|
self.end_time = end_time
|
|
self.desks = set(desks)
|
|
|
|
def __str__(self):
|
|
return u'%s / %s → %s' % (
|
|
force_text(WEEKDAYS[self.weekday]),
|
|
date_format(self.start_time, 'TIME_FORMAT'),
|
|
date_format(self.end_time, 'TIME_FORMAT'),
|
|
)
|
|
|
|
def __eq__(self, other):
|
|
return (self.weekday, self.start_time, self.end_time) == (
|
|
other.weekday,
|
|
other.start_time,
|
|
other.end_time,
|
|
)
|
|
|
|
def __lt__(self, other):
|
|
return (self.weekday, self.start_time, self.end_time) < (
|
|
other.weekday,
|
|
other.start_time,
|
|
other.end_time,
|
|
)
|
|
|
|
def get_time_slots(self, min_datetime, max_datetime, meeting_duration, base_duration):
|
|
'''Generate all possible time slots between min_datetime and max_datime
|
|
of duration meeting_duration minutes and spaced by base_duration
|
|
minutes, i.e.
|
|
|
|
compute a list [a,b] -> [c,d] -> ...
|
|
where b-a = meeting_duration and c-a = base_duration.
|
|
|
|
We start with the first time following min_datetime and being on
|
|
the same weekday of the current period.
|
|
|
|
Then we iterate, advancing by base_duration minutes each time.
|
|
|
|
If we cross the end_time of the period or end of the current_day
|
|
(means end_time is midnight), it advance time to self.start_time on
|
|
the next week (same weekday, same start, one week in the future).
|
|
|
|
When it crosses end_datetime it stops.
|
|
|
|
Generated start_datetime MUST be in the local timezone as the API
|
|
needs it to generate stable ids.
|
|
'''
|
|
meeting_duration = datetime.timedelta(minutes=meeting_duration)
|
|
duration = datetime.timedelta(minutes=base_duration)
|
|
|
|
real_min_datetime = min_datetime + datetime.timedelta(days=self.weekday - min_datetime.weekday())
|
|
if real_min_datetime < min_datetime:
|
|
real_min_datetime += datetime.timedelta(days=7)
|
|
|
|
# make sure datetime in local timezone, it's ABSOLUTELY necessary
|
|
# to have stable event ids in the API.
|
|
event_datetime = make_aware(make_naive(real_min_datetime)).replace(
|
|
hour=self.start_time.hour, minute=self.start_time.minute, second=0, microsecond=0
|
|
)
|
|
while event_datetime < max_datetime:
|
|
end_time = event_datetime + meeting_duration
|
|
next_time = event_datetime + duration
|
|
if end_time.time() > self.end_time or event_datetime.date() != next_time.date():
|
|
# switch to naive time for day/week changes
|
|
event_datetime = make_naive(event_datetime)
|
|
# back to morning
|
|
event_datetime = event_datetime.replace(
|
|
hour=self.start_time.hour, minute=self.start_time.minute
|
|
)
|
|
# but next week
|
|
event_datetime += datetime.timedelta(days=7)
|
|
# and re-align to timezone afterwards
|
|
event_datetime = make_aware(event_datetime)
|
|
next_time = event_datetime + duration
|
|
|
|
if event_datetime > max_datetime:
|
|
break
|
|
|
|
yield event_datetime
|
|
event_datetime = next_time
|
|
|
|
@classmethod
|
|
def from_weektime_interval(cls, weektime_interval, desks=()):
|
|
begin, end = weektime_interval
|
|
assert begin.weekday == end.weekday
|
|
return cls(weekday=begin.weekday, start_time=begin.time, end_time=end.time, desks=desks,)
|
|
|
|
|
|
class MeetingType(models.Model):
|
|
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
|
|
label = models.CharField(_('Label'), max_length=150)
|
|
slug = models.SlugField(_('Identifier'), max_length=160)
|
|
duration = models.IntegerField(_('Duration (in minutes)'), default=30)
|
|
deleted = models.BooleanField(_('Deleted'), default=False)
|
|
|
|
class Meta:
|
|
ordering = ['duration', 'label']
|
|
unique_together = ['agenda', 'slug']
|
|
|
|
def save(self, *args, **kwargs):
|
|
assert self.agenda.kind != 'virtual', "a meetingtype can't reference a virtual agenda"
|
|
if not self.slug:
|
|
self.slug = generate_slug(self, agenda=self.agenda)
|
|
super(MeetingType, self).save(*args, **kwargs)
|
|
|
|
@property
|
|
def base_slug(self):
|
|
return slugify(self.label)
|
|
|
|
@classmethod
|
|
def import_json(cls, data):
|
|
data = clean_import_data(cls, data)
|
|
meeting_type, created = cls.objects.get_or_create(
|
|
slug=data['slug'], agenda=data['agenda'], defaults=data
|
|
)
|
|
if not created:
|
|
for k, v in data.items():
|
|
setattr(meeting_type, k, v)
|
|
return meeting_type
|
|
|
|
def export_json(self):
|
|
return {
|
|
'label': self.label,
|
|
'slug': self.slug,
|
|
'duration': self.duration,
|
|
}
|
|
|
|
def duplicate(self, agenda_target=None):
|
|
new_meeting_type = copy.deepcopy(self)
|
|
new_meeting_type.pk = None
|
|
if agenda_target:
|
|
new_meeting_type.agenda = agenda_target
|
|
else:
|
|
new_meeting_type.slug = None
|
|
new_meeting_type.save()
|
|
|
|
return new_meeting_type
|
|
|
|
|
|
class Event(models.Model):
|
|
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
|
|
start_datetime = models.DateTimeField(_('Date/time'))
|
|
duration = models.PositiveIntegerField(_('Duration (in minutes)'), default=None, null=True, blank=True)
|
|
publication_date = models.DateField(_('Publication date'), blank=True, null=True)
|
|
places = models.PositiveIntegerField(_('Places'))
|
|
waiting_list_places = models.PositiveIntegerField(_('Places in waiting list'), default=0)
|
|
label = models.CharField(
|
|
_('Label'),
|
|
max_length=150,
|
|
null=True,
|
|
blank=True,
|
|
help_text=_('Optional label to identify this date.'),
|
|
)
|
|
slug = models.SlugField(_('Identifier'), max_length=160, blank=True, validators=[validate_not_digit])
|
|
description = models.TextField(
|
|
_('Description'), null=True, blank=True, help_text=_('Optional event description.')
|
|
)
|
|
pricing = models.CharField(_('Pricing'), max_length=150, null=True, blank=True)
|
|
url = models.CharField(_('URL'), max_length=200, null=True, blank=True)
|
|
full = models.BooleanField(default=False)
|
|
meeting_type = models.ForeignKey(MeetingType, null=True, on_delete=models.CASCADE)
|
|
desk = models.ForeignKey('Desk', null=True, on_delete=models.CASCADE)
|
|
resources = models.ManyToManyField('Resource')
|
|
|
|
class Meta:
|
|
ordering = ['agenda', 'start_datetime', 'duration', 'label']
|
|
unique_together = ('agenda', 'slug')
|
|
|
|
def __str__(self):
|
|
if self.label:
|
|
return self.label
|
|
return date_format(localtime(self.start_datetime), format='DATETIME_FORMAT')
|
|
|
|
def save(self, *args, **kwargs):
|
|
assert self.agenda.kind != 'virtual', "an event can't reference a virtual agenda"
|
|
assert not (self.slug and self.slug.isdigit()), 'slug cannot be a number'
|
|
self.check_full()
|
|
if not self.slug:
|
|
self.slug = generate_slug(self, agenda=self.agenda)
|
|
return super(Event, self).save(*args, **kwargs)
|
|
|
|
@property
|
|
def base_slug(self):
|
|
# label can be empty
|
|
return slugify(self.label or ('%s-event' % self.agenda.label))
|
|
|
|
@functional.cached_property
|
|
def main_list_full(self):
|
|
return bool(self.booked_places >= self.places)
|
|
|
|
def check_full(self):
|
|
self.full = bool(
|
|
(self.booked_places >= self.places and self.waiting_list_places == 0)
|
|
or (self.waiting_list_places and self.waiting_list >= self.waiting_list_places)
|
|
)
|
|
|
|
def in_bookable_period(self):
|
|
if self.publication_date and localtime(now()).date() < self.publication_date:
|
|
return False
|
|
today = localtime(now()).date()
|
|
event_day = localtime(self.start_datetime).date()
|
|
days_to_event = event_day - today
|
|
if days_to_event < datetime.timedelta(days=self.agenda.minimal_booking_delay):
|
|
return False
|
|
if self.agenda.maximal_booking_delay:
|
|
if days_to_event >= datetime.timedelta(days=self.agenda.maximal_booking_delay):
|
|
return False
|
|
if self.start_datetime < now():
|
|
# past the event date, we may want in the future to allow for some
|
|
# extra late booking but it's forbidden for now.
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def annotate_queryset(qs):
|
|
if django.VERSION < (2, 0):
|
|
return qs.annotate(
|
|
booked_places_count=Count(
|
|
Case(
|
|
When(
|
|
booking__cancellation_datetime__isnull=True,
|
|
booking__in_waiting_list=False,
|
|
then='booking',
|
|
)
|
|
)
|
|
),
|
|
waiting_list_count=Count(
|
|
Case(
|
|
When(
|
|
booking__cancellation_datetime__isnull=True,
|
|
booking__in_waiting_list=True,
|
|
then='booking',
|
|
)
|
|
)
|
|
),
|
|
)
|
|
else:
|
|
return qs.annotate(
|
|
booked_places_count=Count(
|
|
'booking',
|
|
filter=Q(booking__cancellation_datetime__isnull=True, booking__in_waiting_list=False),
|
|
),
|
|
waiting_list_count=Count(
|
|
'booking',
|
|
filter=Q(booking__cancellation_datetime__isnull=True, booking__in_waiting_list=True),
|
|
),
|
|
)
|
|
|
|
@property
|
|
def booked_places(self):
|
|
return self.booking_set.filter(cancellation_datetime__isnull=True, in_waiting_list=False).count()
|
|
|
|
@property
|
|
def waiting_list(self):
|
|
return self.booking_set.filter(cancellation_datetime__isnull=True, in_waiting_list=True).count()
|
|
|
|
@property
|
|
def end_datetime(self):
|
|
if self.meeting_type:
|
|
minutes = self.meeting_type.duration
|
|
else:
|
|
minutes = self.duration
|
|
if minutes is None:
|
|
return None
|
|
return self.start_datetime + datetime.timedelta(minutes=minutes)
|
|
|
|
def get_absolute_url(self):
|
|
return reverse('chrono-manager-event-edit', kwargs={'pk': self.agenda.id, 'event_pk': self.id})
|
|
|
|
@classmethod
|
|
def import_json(cls, data):
|
|
data['start_datetime'] = make_aware(
|
|
datetime.datetime.strptime(data['start_datetime'], '%Y-%m-%d %H:%M:%S')
|
|
)
|
|
data = clean_import_data(cls, data)
|
|
if data.get('slug'):
|
|
event, created = cls.objects.get_or_create(slug=data['slug'], defaults=data)
|
|
if not created:
|
|
for k, v in data.items():
|
|
setattr(event, k, v)
|
|
return event
|
|
return cls(**data)
|
|
|
|
def export_json(self):
|
|
return {
|
|
'start_datetime': make_naive(self.start_datetime).strftime('%Y-%m-%d %H:%M:%S'),
|
|
'publication_date': self.publication_date.strftime('%Y-%m-%d') if self.publication_date else None,
|
|
'places': self.places,
|
|
'waiting_list_places': self.waiting_list_places,
|
|
'label': self.label,
|
|
'slug': self.slug,
|
|
'description': self.description,
|
|
'url': self.url,
|
|
'pricing': self.pricing,
|
|
}
|
|
|
|
def duplicate(self, agenda_target=None):
|
|
new_event = copy.deepcopy(self)
|
|
new_event.pk = None
|
|
if agenda_target:
|
|
new_event.agenda = agenda_target
|
|
else:
|
|
new_event.slug = None
|
|
new_event.save()
|
|
|
|
return new_event
|
|
|
|
|
|
class Booking(models.Model):
|
|
event = models.ForeignKey(Event, on_delete=models.CASCADE)
|
|
extra_data = JSONField(null=True)
|
|
cancellation_datetime = models.DateTimeField(null=True)
|
|
in_waiting_list = models.BooleanField(default=False)
|
|
creation_datetime = models.DateTimeField(auto_now_add=True)
|
|
# primary booking is used to group multiple bookings together
|
|
primary_booking = models.ForeignKey(
|
|
'self', null=True, on_delete=models.CASCADE, related_name='secondary_booking_set'
|
|
)
|
|
|
|
label = models.CharField(max_length=250, blank=True)
|
|
user_display_label = models.CharField(
|
|
verbose_name=_('Label displayed to user'), max_length=250, blank=True
|
|
)
|
|
user_external_id = models.CharField(max_length=250, blank=True)
|
|
user_name = models.CharField(max_length=250, blank=True)
|
|
backoffice_url = models.URLField(blank=True)
|
|
|
|
def save(self, *args, **kwargs):
|
|
with transaction.atomic():
|
|
super(Booking, self).save(*args, **kwargs)
|
|
initial_value = self.event.full
|
|
self.event.check_full()
|
|
if self.event.full != initial_value:
|
|
self.event.save()
|
|
|
|
def cancel(self):
|
|
timestamp = now()
|
|
with transaction.atomic():
|
|
self.secondary_booking_set.update(cancellation_datetime=timestamp)
|
|
self.cancellation_datetime = timestamp
|
|
self.save()
|
|
|
|
def accept(self):
|
|
self.in_waiting_list = False
|
|
with transaction.atomic():
|
|
self.secondary_booking_set.update(in_waiting_list=False)
|
|
self.save()
|
|
|
|
def suspend(self):
|
|
self.in_waiting_list = True
|
|
with transaction.atomic():
|
|
self.secondary_booking_set.update(in_waiting_list=True)
|
|
self.save()
|
|
|
|
def get_ics(self, request=None):
|
|
ics = vobject.iCalendar()
|
|
ics.add('prodid').value = '-//Entr\'ouvert//NON SGML Publik'
|
|
vevent = vobject.newFromBehavior('vevent')
|
|
vevent.add('uid').value = '%s-%s-%s' % (
|
|
self.event.start_datetime.isoformat(),
|
|
self.event.agenda.pk,
|
|
self.pk,
|
|
)
|
|
|
|
vevent.add('summary').value = self.user_display_label or self.label
|
|
vevent.add('dtstart').value = self.event.start_datetime
|
|
if self.user_name:
|
|
vevent.add('attendee').value = self.user_name
|
|
if self.event.end_datetime:
|
|
vevent.add('dtend').value = self.event.end_datetime
|
|
|
|
for field in ('description', 'location', 'comment', 'url'):
|
|
field_value = request and request.GET.get(field) or self.extra_data.get(field)
|
|
if field_value:
|
|
vevent.add(field).value = field_value
|
|
ics.add(vevent)
|
|
return ics.serialize()
|
|
|
|
def clone(self, in_waiting_list=False, primary_booking=None, save=True):
|
|
new_booking = copy.deepcopy(self)
|
|
new_booking.id = None
|
|
new_booking.in_waiting_list = in_waiting_list
|
|
new_booking.primary_booking = primary_booking
|
|
if save:
|
|
new_booking.save()
|
|
return new_booking
|
|
|
|
|
|
OpeningHour = collections.namedtuple('OpeningHour', ['begin', 'end'])
|
|
|
|
|
|
class Desk(models.Model):
|
|
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
|
|
label = models.CharField(_('Label'), max_length=150)
|
|
slug = models.SlugField(_('Identifier'), max_length=160)
|
|
|
|
def __str__(self):
|
|
return self.label
|
|
|
|
class Meta:
|
|
ordering = ['label']
|
|
unique_together = ['agenda', 'slug']
|
|
|
|
def save(self, *args, **kwargs):
|
|
assert self.agenda.kind != 'virtual', "a desk can't reference a virtual agenda"
|
|
if not self.slug:
|
|
self.slug = generate_slug(self, agenda=self.agenda)
|
|
super(Desk, self).save(*args, **kwargs)
|
|
|
|
@property
|
|
def base_slug(self):
|
|
return slugify(self.label)
|
|
|
|
@classmethod
|
|
def import_json(cls, data):
|
|
timeperiods = data.pop('timeperiods', [])
|
|
exceptions = data.pop('exceptions', [])
|
|
data = clean_import_data(cls, data)
|
|
instance, created = cls.objects.get_or_create(slug=data['slug'], agenda=data['agenda'], defaults=data)
|
|
if not created:
|
|
for k, v in data.items():
|
|
setattr(instance, k, v)
|
|
for timeperiod in timeperiods:
|
|
timeperiod['desk'] = instance
|
|
TimePeriod.import_json(timeperiod).save()
|
|
for exception in exceptions:
|
|
exception['desk'] = instance
|
|
TimePeriodException.import_json(exception).save()
|
|
return instance
|
|
|
|
def export_json(self):
|
|
return {
|
|
'label': self.label,
|
|
'slug': self.slug,
|
|
'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.all()],
|
|
'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()],
|
|
}
|
|
|
|
def duplicate(self, label=None, agenda_target=None):
|
|
# clone current desk
|
|
new_desk = copy.deepcopy(self)
|
|
new_desk.pk = None
|
|
# set label
|
|
new_desk.label = label or _('Copy of %s') % self.label
|
|
if agenda_target:
|
|
new_desk.agenda = agenda_target
|
|
else:
|
|
new_desk.slug = None
|
|
# store new desk
|
|
new_desk.save()
|
|
|
|
# clone related objects
|
|
for time_period in self.timeperiod_set.all():
|
|
time_period.duplicate(desk_target=new_desk)
|
|
for time_period_exception in self.timeperiodexception_set.all():
|
|
time_period_exception.duplicate(desk_target=new_desk)
|
|
for time_period_exception_source in self.timeperiodexceptionsource_set.all():
|
|
time_period_exception_source.duplicate(desk_target=new_desk)
|
|
|
|
return new_desk
|
|
|
|
def get_exceptions_within_two_weeks(self):
|
|
# timeperiodexception_set.all() is prefetched, do not filter the querysets
|
|
# default ordering: start_datetime
|
|
in_two_weeks = make_aware(datetime.datetime.today() + datetime.timedelta(days=14))
|
|
exceptions = []
|
|
for exception in self.timeperiodexception_set.all():
|
|
if exception.end_datetime < now():
|
|
# exception ends in the past, skip it
|
|
continue
|
|
if exception.end_datetime <= in_two_weeks:
|
|
# ends in less than 2 weeks
|
|
exceptions.append(exception)
|
|
elif exception.start_datetime < now():
|
|
# has already started
|
|
exceptions.append(exception)
|
|
if exceptions:
|
|
return exceptions
|
|
# if none found within the 2 coming weeks, return the next one
|
|
for exception in self.timeperiodexception_set.all():
|
|
if exception.start_datetime < now():
|
|
# exception starts in the past, skip it
|
|
continue
|
|
# returns the first exception found
|
|
return [exception]
|
|
return []
|
|
|
|
def are_all_exceptions_displayed(self):
|
|
# timeperiod_set.all() is prefetched, do not filter the queryset
|
|
in_two_weeks = self.get_exceptions_within_two_weeks()
|
|
return len(self.timeperiodexception_set.all()) == len(in_two_weeks)
|
|
|
|
def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None):
|
|
try:
|
|
response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES)
|
|
response.raise_for_status()
|
|
except requests.HTTPError as e:
|
|
raise ICSError(
|
|
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
|
|
% {'url': ics_url, 'status_code': e.response.status_code}
|
|
)
|
|
except requests.RequestException as e:
|
|
raise ICSError(
|
|
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
|
|
% {'url': ics_url, 'exception': e}
|
|
)
|
|
|
|
if source is None:
|
|
source = TimePeriodExceptionSource(desk=self, ics_url=ics_url)
|
|
try:
|
|
# override response encoding received in HTTP headers as it may
|
|
# often be missing and defaults to iso-8859-15.
|
|
response.content.decode('utf-8')
|
|
response.encoding = 'utf-8'
|
|
except UnicodeDecodeError:
|
|
pass
|
|
return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text)
|
|
|
|
def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
|
|
if source is None:
|
|
source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name, ics_file=ics_file)
|
|
return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read()))
|
|
|
|
def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600):
|
|
try:
|
|
parsed = vobject.readOne(data)
|
|
except vobject.base.ParseError:
|
|
raise ICSError(_('File format is invalid.'))
|
|
|
|
total_created = 0
|
|
|
|
if not parsed.contents.get('vevent'):
|
|
raise ICSError(_('The file doesn\'t contain any events.'))
|
|
|
|
with transaction.atomic():
|
|
if source.pk is None:
|
|
source.save()
|
|
# delete old exceptions related to this source
|
|
source.timeperiodexception_set.all().delete()
|
|
# create new exceptions
|
|
update_datetime = now()
|
|
for vevent in parsed.contents.get('vevent', []):
|
|
if 'summary' in vevent.contents:
|
|
summary = force_text(vevent.contents['summary'][0].value)
|
|
else:
|
|
summary = _('Exception')
|
|
try:
|
|
start_dt = vevent.dtstart.value
|
|
if not isinstance(start_dt, datetime.datetime):
|
|
start_dt = datetime.datetime.combine(start_dt, datetime.datetime.min.time())
|
|
if not is_aware(start_dt):
|
|
start_dt = make_aware(start_dt)
|
|
except AttributeError:
|
|
raise ICSError(_('Event "%s" has no start date.') % summary)
|
|
try:
|
|
end_dt = vevent.dtend.value
|
|
if not isinstance(end_dt, datetime.datetime):
|
|
end_dt = datetime.datetime.combine(end_dt, datetime.datetime.min.time())
|
|
if not is_aware(end_dt):
|
|
end_dt = make_aware(end_dt)
|
|
duration = end_dt - start_dt
|
|
except AttributeError:
|
|
try:
|
|
duration = vevent.duration.value
|
|
end_dt = start_dt + duration
|
|
except AttributeError:
|
|
# events without end date are considered as ending the same day
|
|
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
|
|
duration = end_dt - start_dt
|
|
|
|
event = {
|
|
'start_datetime': start_dt,
|
|
'end_datetime': end_dt,
|
|
'label': summary,
|
|
'desk': self,
|
|
'source': source,
|
|
'recurrence_id': 0,
|
|
}
|
|
|
|
if not vevent.rruleset:
|
|
# classical event
|
|
TimePeriodException.objects.create(**event)
|
|
total_created += 1
|
|
elif vevent.rruleset.count():
|
|
# recurring event until recurring_days in the future
|
|
from_dt = start_dt
|
|
until_dt = update_datetime + datetime.timedelta(days=recurring_days)
|
|
if not is_aware(vevent.rruleset[0]):
|
|
from_dt = make_naive(from_dt)
|
|
until_dt = make_naive(until_dt)
|
|
i = -1
|
|
for i, start_dt in enumerate(vevent.rruleset.between(from_dt, until_dt, inc=True)):
|
|
# recompute start_dt and end_dt from occurrences and duration
|
|
if not is_aware(start_dt):
|
|
start_dt = make_aware(start_dt)
|
|
end_dt = start_dt + duration
|
|
event['recurrence_id'] = i
|
|
event['start_datetime'] = start_dt
|
|
event['end_datetime'] = end_dt
|
|
if end_dt >= update_datetime:
|
|
TimePeriodException.objects.create(**event)
|
|
total_created += 1
|
|
|
|
return total_created
|
|
|
|
def get_opening_hours(self, date):
|
|
openslots = IntervalSet()
|
|
for timeperiod in self.timeperiod_set.all():
|
|
# timeperiod_set.all() are prefetched, do not filter in queryset
|
|
if timeperiod.weekday != date.weekday():
|
|
continue
|
|
start_datetime = make_aware(datetime.datetime.combine(date, timeperiod.start_time))
|
|
end_datetime = make_aware(datetime.datetime.combine(date, timeperiod.end_time))
|
|
openslots.add(start_datetime, end_datetime)
|
|
|
|
aware_date = make_aware(datetime.datetime(date.year, date.month, date.day))
|
|
exceptions = IntervalSet()
|
|
aware_next_date = aware_date + datetime.timedelta(days=1)
|
|
for exception in self.timeperiodexception_set.all():
|
|
# timeperiodexception_set.all() are prefetched, do not filter in queryset
|
|
if exception.end_datetime < aware_date:
|
|
continue
|
|
if exception.start_datetime > aware_next_date:
|
|
continue
|
|
exceptions.add(exception.start_datetime, exception.end_datetime)
|
|
|
|
return [OpeningHour(*time_range) for time_range in (openslots - exceptions)]
|
|
|
|
|
|
class Resource(models.Model):
|
|
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
|
|
label = models.CharField(_('Label'), max_length=150)
|
|
description = models.TextField(_('Description'), blank=True, help_text=_('Optional description.'))
|
|
|
|
def __str__(self):
|
|
return self.label
|
|
|
|
class Meta:
|
|
ordering = ['label']
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self.slug:
|
|
self.slug = generate_slug(self)
|
|
super().save(*args, **kwargs)
|
|
|
|
@property
|
|
def base_slug(self):
|
|
return slugify(self.label)
|
|
|
|
|
|
def ics_directory_path(instance, filename):
|
|
return 'ics/{0}/{1}'.format(str(uuid.uuid4()), filename)
|
|
|
|
|
|
class TimePeriodExceptionSource(models.Model):
|
|
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
|
|
ics_filename = models.CharField(null=True, max_length=256)
|
|
ics_file = models.FileField(upload_to=ics_directory_path, blank=True, null=True)
|
|
ics_url = models.URLField(null=True, max_length=500)
|
|
|
|
def __str__(self):
|
|
if self.ics_filename is not None:
|
|
return self.ics_filename
|
|
return self.ics_url
|
|
|
|
def duplicate(self, desk_target=None):
|
|
# clone current source
|
|
new_source = copy.deepcopy(self)
|
|
new_source.pk = None
|
|
# set desk
|
|
new_source.desk = desk_target or self.desk
|
|
# set ics_file
|
|
if self.ics_file:
|
|
with open(self.ics_file.path) as ics_file:
|
|
new_source.ics_file.save(self.ics_filename, ics_file, save=False)
|
|
# store new source
|
|
new_source.save()
|
|
|
|
return new_source
|
|
|
|
|
|
class TimePeriodException(models.Model):
|
|
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
|
|
source = models.ForeignKey(TimePeriodExceptionSource, on_delete=models.CASCADE, null=True)
|
|
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
|
|
start_datetime = models.DateTimeField(_('Exception start time'))
|
|
end_datetime = models.DateTimeField(_('Exception end time'))
|
|
update_datetime = models.DateTimeField(auto_now=True)
|
|
recurrence_id = models.PositiveIntegerField(_('Recurrence ID'), default=0)
|
|
|
|
class Meta:
|
|
ordering = ['start_datetime']
|
|
|
|
def __str__(self):
|
|
if is_midnight(self.start_datetime) and is_midnight(self.end_datetime):
|
|
# if both dates are at midnight don't include the time part
|
|
if self.end_datetime == self.start_datetime + datetime.timedelta(days=1):
|
|
# a single day
|
|
exc_repr = u'%s' % date_format(localtime(self.start_datetime), 'SHORT_DATE_FORMAT')
|
|
else:
|
|
exc_repr = u'%s → %s' % (
|
|
date_format(localtime(self.start_datetime), 'SHORT_DATE_FORMAT'),
|
|
date_format(localtime(self.end_datetime), 'SHORT_DATE_FORMAT'),
|
|
)
|
|
else:
|
|
if localtime(self.start_datetime).date() == localtime(self.end_datetime).date():
|
|
# same day
|
|
exc_repr = u'%s → %s' % (
|
|
date_format(localtime(self.start_datetime), 'SHORT_DATETIME_FORMAT'),
|
|
date_format(localtime(self.end_datetime), 'TIME_FORMAT'),
|
|
)
|
|
else:
|
|
exc_repr = u'%s → %s' % (
|
|
date_format(localtime(self.start_datetime), 'SHORT_DATETIME_FORMAT'),
|
|
date_format(localtime(self.end_datetime), 'SHORT_DATETIME_FORMAT'),
|
|
)
|
|
|
|
if self.label:
|
|
exc_repr = u'%s (%s)' % (self.label, exc_repr)
|
|
|
|
return exc_repr
|
|
|
|
def has_booking_within_time_slot(self):
|
|
if not (self.start_datetime and self.end_datetime):
|
|
# incomplete time period, can't tell
|
|
return False
|
|
for event in Event.objects.filter(
|
|
desk=self.desk, booking__isnull=False, booking__cancellation_datetime__isnull=True
|
|
):
|
|
if self.start_datetime <= event.start_datetime < self.end_datetime:
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def import_json(cls, data):
|
|
def import_datetime(s):
|
|
'''Import datetime as a naive ISO8601 serialization'''
|
|
return make_aware(datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S'))
|
|
|
|
for k, v in data.items():
|
|
if k.endswith('_datetime'):
|
|
data[k] = import_datetime(v)
|
|
data = clean_import_data(cls, data)
|
|
return cls(**data)
|
|
|
|
def export_json(self):
|
|
def export_datetime(dt):
|
|
'''Export datetime as a naive ISO8601 serialization'''
|
|
return make_naive(dt).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
return {
|
|
'label': self.label,
|
|
'start_datetime': export_datetime(self.start_datetime),
|
|
'end_datetime': export_datetime(self.end_datetime),
|
|
'recurrence_id': self.recurrence_id,
|
|
'update_datetime': export_datetime(self.update_datetime),
|
|
}
|
|
|
|
def duplicate(self, desk_target=None):
|
|
# clone current exception
|
|
new_exception = copy.deepcopy(self)
|
|
new_exception.pk = None
|
|
# set desk
|
|
new_exception.desk = desk_target or self.desk
|
|
# store new exception
|
|
new_exception.save()
|
|
|
|
return new_exception
|
|
|
|
def as_interval(self):
|
|
'''Simplify insertion into IntervalSet'''
|
|
return Interval(self.start_datetime, self.end_datetime)
|