chrono/chrono/agendas/models.py

3715 lines
143 KiB
Python

# chrono - agendas system
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import copy
import dataclasses
import datetime
import functools
import itertools
import math
import sys
import uuid
from contextlib import contextmanager
import requests
import vobject
from dateutil.relativedelta import SU, relativedelta
from dateutil.rrule import DAILY, WEEKLY, rrule, rruleset
from django.conf import settings
from django.contrib.auth.models import Group
from django.contrib.humanize.templatetags.humanize import ordinal
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import connection, models, transaction
from django.db.models import (
Count,
Exists,
ExpressionWrapper,
F,
Func,
IntegerField,
Max,
OuterRef,
Prefetch,
Q,
Subquery,
Value,
)
from django.db.models.functions import Cast, Coalesce, Concat, ExtractWeek, ExtractWeekDay, JSONObject
from django.template import (
Context,
RequestContext,
Template,
TemplateSyntaxError,
VariableDoesNotExist,
engines,
)
from django.urls import reverse
from django.utils import functional
from django.utils.dates import WEEKDAYS
from django.utils.encoding import force_str
from django.utils.formats import date_format
from django.utils.functional import cached_property
from django.utils.html import escape
from django.utils.module_loading import import_string
from django.utils.safestring import mark_safe
from django.utils.text import slugify
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
from django.utils.translation import ngettext, pgettext_lazy
from chrono.interval import Interval, IntervalSet
from chrono.utils.date import get_weekday_index
from chrono.utils.db import ArraySubquery, SumCardinality
from chrono.utils.misc import AgendaImportError, ICSError, clean_import_data, generate_slug
from chrono.utils.publik_urls import translate_from_publik_url
from chrono.utils.requests_wrapper import requests as requests_wrapper
from chrono.utils.timezone import is_aware, localtime, make_aware, make_naive, now, utc
AGENDA_KINDS = (
('events', _('Events')),
('meetings', _('Meetings')),
('virtual', _('Virtual')),
)
AGENDA_VIEWS = (
('day', _('Day view')),
('week', _('Week view')),
('month', _('Month view')),
('open_events', _('Open events')),
)
WEEKDAYS_PLURAL = {
0: _('Mondays'),
1: _('Tuesdays'),
2: _('Wednesdays'),
3: _('Thursdays'),
4: _('Fridays'),
5: _('Saturdays'),
6: _('Sundays'),
}
WEEKDAY_CHOICES = [
(0, _('Mo')),
(1, _('Tu')),
(2, _('We')),
(3, _('Th')),
(4, _('Fr')),
(5, _('Sa')),
(6, _('Su')),
]
def is_midnight(dtime):
dtime = localtime(dtime)
return dtime.hour == 0 and dtime.minute == 0
def validate_not_digit(value):
if value.isdigit():
raise ValidationError(_('This value cannot be a number.'))
def django_template_validator(value):
try:
engines['django'].from_string(value)
except TemplateSyntaxError as e:
raise ValidationError(_('syntax error: %s') % e)
def event_template_validator(value):
example_event = Event(
start_datetime=now(),
publication_datetime=now(),
recurrence_end_date=now().date(),
places=1,
duration=1,
)
try:
Template(value).render(Context({'event': example_event}))
except (VariableDoesNotExist, TemplateSyntaxError) as e:
raise ValidationError(_('syntax error: %s') % e)
def booking_template_validator(value):
example_event = Event(
start_datetime=now(),
publication_datetime=now(),
recurrence_end_date=now().date(),
places=1,
duration=1,
)
example_booking = Booking(event=example_event)
try:
Template(value).render(Context({'booking': example_booking}))
except TemplateSyntaxError as e:
raise ValidationError(_('syntax error: %s') % e)
except VariableDoesNotExist:
pass
class Agenda(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
kind = models.CharField(_('Kind'), max_length=20, choices=AGENDA_KINDS, default='events')
minimal_booking_delay = models.PositiveIntegerField(
_('Minimal booking delay (in days)'),
default=None,
null=True,
blank=True,
validators=[MaxValueValidator(10000)],
)
minimal_booking_delay_in_working_days = models.BooleanField(
_('Minimal booking delay in working days'),
default=False,
)
maximal_booking_delay = models.PositiveIntegerField(
_('Maximal booking delay (in days)'),
default=None,
null=True,
blank=True,
validators=[MaxValueValidator(10000)],
) # eight weeks
anonymize_delay = models.PositiveIntegerField(
_('Anonymize delay (in days)'),
default=None,
null=True,
blank=True,
validators=[MinValueValidator(30), MaxValueValidator(1000)],
help_text=_('User data will be kept for the specified number of days passed the booking date.'),
)
real_agendas = models.ManyToManyField(
'self',
related_name='virtual_agendas',
symmetrical=False,
through='VirtualMember',
through_fields=('virtual_agenda', 'real_agenda'),
)
edit_role = models.ForeignKey(
Group,
blank=True,
null=True,
default=None,
related_name='+',
verbose_name=_('Edit Role'),
on_delete=models.SET_NULL,
)
view_role = models.ForeignKey(
Group,
blank=True,
null=True,
default=None,
related_name='+',
verbose_name=_('View Role'),
on_delete=models.SET_NULL,
)
resources = models.ManyToManyField('Resource')
category = models.ForeignKey(
'Category', verbose_name=_('Category'), blank=True, null=True, on_delete=models.SET_NULL
)
default_view = models.CharField(_('Default view'), max_length=20, choices=AGENDA_VIEWS)
booking_form_url = models.CharField(
_('Booking form URL'), max_length=200, blank=True, validators=[django_template_validator]
)
desk_simple_management = models.BooleanField(default=False)
mark_event_checked_auto = models.BooleanField(
_('Automatically mark event as checked when all bookings have been checked'), default=False
)
disable_check_update = models.BooleanField(
_('Prevent the check of bookings when event was marked as checked'), default=False
)
enable_check_for_future_events = models.BooleanField(
_('Enable the check of bookings when event has not passed'), default=False
)
booking_check_filters = models.CharField(
_('Filters'),
max_length=250,
blank=True,
help_text=_('Comma separated list of keys defined in extra_data.'),
)
booking_user_block_template = models.TextField(
_('User block template'),
blank=True,
validators=[django_template_validator],
)
booking_extra_user_block_template = models.TextField(
_('Extra user block template'),
blank=True,
validators=[django_template_validator],
help_text=_('Displayed on check page'),
)
event_display_template = models.CharField(
_('Event display template'),
max_length=256,
blank=True,
validators=[event_template_validator],
help_text=_(
'By default event labels will be displayed to users. '
'This allows for a custom template to include additional informations. '
'For example, "{{ event.label }} - {{ event.start_datetime }}" will show event datetime after label. '
'Available variables: event.label (label), event.start_datetime (start date/time), event.places (places), '
'event.remaining_places (remaining places), event.duration (duration), event.pricing (pricing).'
),
)
events_type = models.ForeignKey(
'agendas.EventsType',
verbose_name=_('Events type'),
on_delete=models.CASCADE,
related_name='agendas',
null=True,
blank=True,
)
minimal_booking_time = models.TimeField(
verbose_name=_('Minimal booking time'),
default=datetime.time(0, 0, 0), # booking is possible starting and finishin at 00:00
help_text=_(
'Ex.: 08:00:00. If left empty, available events will be those that are later than the current time.'
),
null=True,
blank=True,
)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
if self.kind != 'virtual':
if self.minimal_booking_delay is None:
self.minimal_booking_delay = 1
if self.maximal_booking_delay is None:
self.maximal_booking_delay = 8 * 7
if not self.default_view:
if self.kind == 'events':
self.default_view = 'month'
else:
self.default_view = 'day'
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
def get_absolute_url(self):
return reverse('chrono-manager-agenda-view', kwargs={'pk': self.id})
def get_settings_url(self):
return reverse('chrono-manager-agenda-settings', kwargs={'pk': self.id})
def get_lingo_url(self):
if not settings.KNOWN_SERVICES.get('lingo'):
return
lingo = list(settings.KNOWN_SERVICES['lingo'].values())[0]
lingo_url = lingo.get('url') or ''
return '%smanage/pricing/agenda/%s/' % (lingo_url, self.slug)
def can_be_managed(self, user):
if user.is_staff:
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.edit_role_id in group_ids)
def can_be_viewed(self, user):
if self.can_be_managed(user):
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.view_role_id in group_ids)
def accept_meetings(self):
if self.kind == 'virtual':
return not self.real_agendas.filter(~Q(kind='meetings')).exists()
return self.kind == 'meetings'
def get_real_agendas(self):
if self.kind == 'virtual':
return self.real_agendas.all()
return [self]
@cached_property
def cached_meetingtypes(self):
return list(self.iter_meetingtypes())
def iter_meetingtypes(self, excluded_agenda=None):
"""Expose agenda's meetingtypes.
straighforward on a real agenda
On a virtual agenda we expose transient meeting types based on on the
the real ones shared by every real agendas.
"""
if self.kind == 'virtual':
base_qs = MeetingType.objects.filter(agenda__virtual_agendas__in=[self], deleted=False)
real_agendas = self.real_agendas
if excluded_agenda:
base_qs = base_qs.exclude(agenda=excluded_agenda)
real_agendas = real_agendas.exclude(pk=excluded_agenda.pk)
queryset = (
base_qs.values('slug', 'duration', 'label')
.annotate(total=Count('*'))
.filter(total=real_agendas.count())
)
return [
MeetingType(duration=mt['duration'], label=mt['label'], slug=mt['slug'])
for mt in queryset.order_by('slug')
]
return self.meetingtype_set.filter(deleted=False).all().order_by('slug')
def get_meetingtype(self, id_=None, slug=None):
match = id_ or slug
assert match, 'an identifier or a slug should be specified'
if self.kind == 'virtual':
match = id_ or slug
meeting_type = None
for mt in self.cached_meetingtypes:
if mt.slug == match:
meeting_type = mt
break
if meeting_type is None:
raise MeetingType.DoesNotExist()
return meeting_type
if id_:
return MeetingType.objects.get(id=id_, agenda=self, deleted=False)
return MeetingType.objects.get(slug=slug, agenda=self, deleted=False)
def get_virtual_members(self):
return VirtualMember.objects.filter(virtual_agenda=self)
def get_max_meeting_duration(self):
return max(x.duration for x in self.cached_meetingtypes)
def get_base_meeting_duration(self):
durations = [x.duration for x in self.cached_meetingtypes]
if not durations:
raise ValueError()
gcd = durations[0]
for duration in durations[1:]:
gcd = math.gcd(duration, gcd)
if gcd == 0:
raise ValueError()
return gcd
def export_json(self):
agenda = {
'label': self.label,
'slug': self.slug,
'kind': self.kind,
'category': self.category.slug if self.category else None,
'minimal_booking_delay': self.minimal_booking_delay,
'maximal_booking_delay': self.maximal_booking_delay,
'permissions': {
'view': self.view_role.name if self.view_role else None,
'edit': self.edit_role.name if self.edit_role else None,
},
'resources': [x.slug for x in self.resources.all()],
'default_view': self.default_view,
}
if hasattr(self, 'reminder_settings'):
agenda['reminder_settings'] = self.reminder_settings.export_json()
if self.kind == 'events':
agenda['booking_form_url'] = self.booking_form_url
agenda['events'] = [x.export_json() for x in self.event_set.filter(primary_event__isnull=True)]
if hasattr(self, 'notifications_settings'):
agenda['notifications_settings'] = self.notifications_settings.export_json()
agenda['exceptions_desk'] = self.desk_set.get().export_json()
agenda['minimal_booking_delay_in_working_days'] = self.minimal_booking_delay_in_working_days
agenda['booking_user_block_template'] = self.booking_user_block_template
agenda['booking_check_filters'] = self.booking_check_filters
agenda['event_display_template'] = self.event_display_template
agenda['mark_event_checked_auto'] = self.mark_event_checked_auto
agenda['events_type'] = self.events_type.slug if self.events_type else None
elif self.kind == 'meetings':
agenda['meetingtypes'] = [x.export_json() for x in self.meetingtype_set.filter(deleted=False)]
agenda['desks'] = [desk.export_json() for desk in self.desk_set.all()]
agenda['desk_simple_management'] = self.desk_simple_management
elif self.kind == 'virtual':
agenda['excluded_timeperiods'] = [x.export_json() for x in self.excluded_timeperiods.all()]
agenda['real_agendas'] = [{'slug': x.slug, 'kind': x.kind} for x in self.real_agendas.all()]
return agenda
@classmethod
def import_json(cls, data, overwrite=False):
data = copy.deepcopy(data)
permissions = data.pop('permissions') or {}
reminder_settings = data.pop('reminder_settings', None)
if data['kind'] == 'events':
events = data.pop('events')
notifications_settings = data.pop('notifications_settings', None)
exceptions_desk = data.pop('exceptions_desk', None)
elif data['kind'] == 'meetings':
meetingtypes = data.pop('meetingtypes')
desks = data.pop('desks')
elif data['kind'] == 'virtual':
excluded_timeperiods = data.pop('excluded_timeperiods')
real_agendas = data.pop('real_agendas')
for permission in ('view', 'edit'):
if permissions.get(permission):
data[permission + '_role'] = Group.objects.get(name=permissions[permission])
resources_slug = data.pop('resources', [])
resources_by_slug = {r.slug: r for r in Resource.objects.filter(slug__in=resources_slug)}
for resource_slug in resources_slug:
if resource_slug not in resources_by_slug:
raise AgendaImportError(_('Missing "%s" resource') % resource_slug)
data = clean_import_data(cls, data)
desk_simple_management = data.pop('desk_simple_management', None)
if data.get('category'):
try:
data['category'] = Category.objects.get(slug=data['category'])
except Category.DoesNotExist:
del data['category']
if data.get('events_type'):
try:
data['events_type'] = EventsType.objects.get(slug=data['events_type'])
except EventsType.DoesNotExist:
raise AgendaImportError(_('Missing "%s" events type') % data['events_type'])
agenda, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite:
AgendaReminderSettings.objects.filter(agenda=agenda).delete()
if reminder_settings:
reminder_settings['agenda'] = agenda
AgendaReminderSettings.import_json(reminder_settings)
if data['kind'] == 'events':
if overwrite:
Event.objects.filter(agenda=agenda).delete()
AgendaNotificationsSettings.objects.filter(agenda=agenda).delete()
for event_data in events:
event_data['agenda'] = agenda
Event.import_json(event_data)
if notifications_settings:
notifications_settings['agenda'] = agenda
AgendaNotificationsSettings.import_json(notifications_settings)
if exceptions_desk:
exceptions_desk['agenda'] = agenda
Desk.import_json(exceptions_desk)
elif data['kind'] == 'meetings':
if overwrite:
MeetingType.objects.filter(agenda=agenda).delete()
Desk.objects.filter(agenda=agenda).delete()
for type_data in meetingtypes:
type_data['agenda'] = agenda
MeetingType.import_json(type_data)
for desk in desks:
desk['agenda'] = agenda
Desk.import_json(desk)
agenda.resources.set(resources_by_slug.values())
elif data['kind'] == 'virtual':
if overwrite:
TimePeriod.objects.filter(agenda=agenda).delete()
VirtualMember.objects.filter(virtual_agenda=agenda).delete()
for excluded_timeperiod in excluded_timeperiods:
excluded_timeperiod['agenda'] = agenda
TimePeriod.import_json(excluded_timeperiod)
for real_agenda in real_agendas:
try:
real_agenda = Agenda.objects.get(slug=real_agenda['slug'], kind=real_agenda['kind'])
except Agenda.DoesNotExist:
raise AgendaImportError(_('The real agenda "%s" does not exist.') % real_agenda['slug'])
try:
vm, created = VirtualMember.objects.get_or_create(
virtual_agenda=agenda, real_agenda=real_agenda
)
vm.clean()
except ValidationError as exc:
raise AgendaImportError(' '.join(exc.messages))
if data['kind'] == 'meetings' and desk_simple_management is not None:
if desk_simple_management is True and not agenda.desk_simple_management:
if agenda.is_available_for_simple_management():
agenda.desk_simple_management = True
agenda.save()
elif desk_simple_management is False and agenda.desk_simple_management:
agenda.desk_simple_management = False
agenda.save()
return created, agenda
def duplicate(self, label=None):
# clone current agenda
new_agenda = copy.deepcopy(self)
new_agenda.pk = None
new_agenda.label = label or _('Copy of %s') % self.label
# reset slug
new_agenda.slug = None
new_agenda.save()
# clone related objects
if self.kind == 'meetings':
for meeting_type in self.meetingtype_set.all():
meeting_type.duplicate(agenda_target=new_agenda)
for desk in self.desk_set.all():
desk.duplicate(agenda_target=new_agenda)
new_agenda.resources.set(self.resources.all())
elif self.kind == 'events':
for event in self.event_set.filter(recurrence_days__isnull=True, primary_event__isnull=True):
event.duplicate(agenda_target=new_agenda)
for primary_event in self.event_set.filter(recurrence_days__isnull=False):
dup_primary_event = primary_event.duplicate(agenda_target=new_agenda)
for child_event in self.event_set.filter(primary_event=primary_event):
child_event.duplicate(agenda_target=new_agenda, primary_event=dup_primary_event)
self.desk_set.get().duplicate(agenda_target=new_agenda, reset_slug=False)
if hasattr(self, 'notifications_settings'):
self.notifications_settings.duplicate(agenda_target=new_agenda)
elif self.kind == 'virtual':
for timeperiod in self.excluded_timeperiods.all():
timeperiod.duplicate(agenda_target=new_agenda)
for real_agenda in self.real_agendas.all():
VirtualMember.objects.create(virtual_agenda=new_agenda, real_agenda=real_agenda)
if hasattr(self, 'reminder_settings'):
self.reminder_settings.duplicate(agenda_target=new_agenda)
return new_agenda
def get_effective_time_periods(self, min_datetime=None, max_datetime=None):
"""Regroup timeperiods by desks.
List all timeperiods, timeperiods having the same begin_time and
end_time are regrouped in a SharedTimePeriod object, which has a
list of desks instead of only one desk.
"""
min_date = min_datetime.date() if min_datetime else None
max_date = max_datetime.date() if max_datetime else None
if self.kind == 'virtual':
return self.get_effective_time_periods_virtual(min_date, max_date)
elif self.kind == 'meetings':
return self.get_effective_time_periods_meetings(min_date, max_date)
else:
raise ValueError('does not work with kind %r' % self.kind)
def get_effective_time_periods_meetings(self, min_date, max_date):
"""List timeperiod instances for all desks of the agenda, convert them
into an Interval of WeekTime which can be compared and regrouped using
itertools.groupby.
"""
time_periods = TimePeriod.objects.filter(desk__agenda=self)
if min_date:
time_periods.filter(Q(date__isnull=True) | Q(date__gte=min_date))
if max_date:
time_periods.filter(Q(date__isnull=True) | Q(date__lte=max_date))
yield from (
SharedTimePeriod.from_weektime_interval(
weektime_interval,
desks=[time_period.desk for time_period in time_periods],
)
for weektime_interval, time_periods in itertools.groupby(
time_periods.prefetch_related('desk').order_by('weekday', 'start_time', 'end_time'),
key=TimePeriod.as_weektime_interval,
)
)
def get_effective_time_periods_virtual(self, min_date, max_date):
"""List timeperiod instances for all desks of all real agendas of this
virtual agenda, convert them into an Interval of WeekTime which can be
compared and regrouped using itertools.groupby.
"""
time_periods = TimePeriod.objects.filter(desk__agenda__virtual_agendas=self)
if min_date:
time_periods.filter(Q(date__isnull=True) | Q(date__gte=min_date))
if max_date:
time_periods.filter(Q(date__isnull=True) | Q(date__lte=max_date))
closed_hours_by_days = IntervalSet.from_ordered(
[
time_period.as_weektime_interval()
for time_period in self.excluded_timeperiods.order_by('weekday', 'start_time', 'end_time')
]
)
for time_period_interval, time_periods in itertools.groupby(
time_periods.order_by('weekday', 'start_time', 'end_time').prefetch_related('desk'),
key=lambda tp: tp.as_weektime_interval(),
):
time_periods = list(time_periods)
desks = [time_period.desk for time_period in time_periods]
if not closed_hours_by_days:
yield SharedTimePeriod.from_weektime_interval(time_period_interval, desks=desks)
else:
date = time_period_interval.begin.date
weekday_indexes = time_period_interval.begin.weekday_indexes
for weektime_interval in IntervalSet.simple(*time_period_interval) - closed_hours_by_days:
yield SharedTimePeriod.from_weektime_interval(
weektime_interval, desks=desks, date=date, weekday_indexes=weekday_indexes
)
@functional.cached_property
def max_booking_datetime(self):
if self.maximal_booking_delay is None:
return None
# reference is now, in local timezone
t = localtime(now())
# add delay
t += datetime.timedelta(days=self.maximal_booking_delay)
# replace time if needed
if self.minimal_booking_time:
t = datetime.datetime.combine(t.date(), self.minimal_booking_time, tzinfo=t.tzinfo)
# t could not exist, recompute it as an existing datetime by converting to UTC then to localtime
return localtime(t.astimezone(utc))
@functional.cached_property
def min_booking_datetime(self):
if self.minimal_booking_delay is None:
return None
# reference is now, in local timezone
t = localtime(now())
# add delay
if settings.WORKING_DAY_CALENDAR is not None and self.minimal_booking_delay_in_working_days:
source_class = import_string(settings.WORKING_DAY_CALENDAR)
calendar = source_class()
t = calendar.add_working_days(t, self.minimal_booking_delay, keep_datetime=True)
else:
t += datetime.timedelta(days=self.minimal_booking_delay)
# replace time if needed
if self.minimal_booking_time:
t = datetime.datetime.combine(t.date(), self.minimal_booking_time, tzinfo=t.tzinfo)
# t could not exist, recompute it as an existing datetime by converting to UTC then to localtime
return localtime(t.astimezone(utc))
def get_open_events(
self,
prefetched_queryset=False,
min_start=None,
max_start=None,
bypass_delays=False,
show_out_of_minimal_delay=False,
):
assert self.kind == 'events'
if prefetched_queryset:
entries = self.prefetched_events
# we may have past events
entries = [e for e in entries if e.start_datetime >= localtime(now())]
else:
# recurring events are never opened
entries = self.event_set.filter(recurrence_days__isnull=True)
# exclude canceled events
entries = entries.filter(cancelled=False)
# we never want to allow booking for past events.
entries = entries.filter(start_datetime__gte=localtime(now()))
# exclude non published events
entries = entries.filter(
Q(publication_datetime__isnull=True) | Q(publication_datetime__lte=now())
)
if not bypass_delays and not show_out_of_minimal_delay and self.minimal_booking_delay:
min_start = max(self.min_booking_datetime, min_start) if min_start else self.min_booking_datetime
if min_start:
if prefetched_queryset:
entries = [e for e in entries if e.start_datetime >= min_start]
else:
entries = entries.filter(start_datetime__gte=min_start)
if not bypass_delays and self.maximal_booking_delay:
max_start = min(self.max_booking_datetime, max_start) if max_start else self.max_booking_datetime
if max_start:
if prefetched_queryset:
entries = [e for e in entries if e.start_datetime < max_start]
else:
entries = entries.filter(start_datetime__lt=max_start)
return entries
def get_past_events(
self,
prefetched_queryset=False,
min_start=None,
max_start=None,
):
assert self.kind == 'events'
if prefetched_queryset:
entries = self.prefetched_events
# we may have future events
entries = [e for e in entries if e.start_datetime < localtime(now())]
else:
# no recurring events
entries = self.event_set.filter(recurrence_days__isnull=True)
# exclude canceled events
entries = entries.filter(cancelled=False)
# we want only past events
entries = entries.filter(start_datetime__lt=localtime(now()))
if min_start and not prefetched_queryset:
entries = entries.filter(start_datetime__gte=min_start)
if max_start and not prefetched_queryset:
entries = entries.filter(start_datetime__lt=max_start)
return entries
def get_open_recurring_events(self):
return [
e
for e in self.prefetched_recurring_events
if not e.recurrence_end_date or e.recurrence_end_date > localtime(now()).date()
]
@transaction.atomic
def update_event_recurrences(self):
recurring_events = self.event_set.filter(recurrence_days__isnull=False)
recurrences = self.event_set.filter(primary_event__isnull=False)
if recurrences.exists():
self.remove_recurrences_inside_exceptions(recurring_events, recurrences)
Event.create_events_recurrences(recurring_events)
def remove_recurrences_inside_exceptions(self, recurring_events, recurrences):
datetimes = []
min_start = localtime(now())
max_start = recurrences.aggregate(dt=Max('start_datetime'))['dt']
exceptions = self.get_recurrence_exceptions(min_start, max_start)
for event in recurring_events:
events = event.get_recurrences(min_start, max_start, exceptions=exceptions)
datetimes.extend([event.start_datetime for event in events])
events = recurrences.filter(start_datetime__gt=min_start).exclude(start_datetime__in=datetimes)
# do not delete events where start_datetime was modified
events_to_delete = [
event.pk for event in events if event.datetime_slug == make_naive(event.start_datetime)
]
recurrences.filter(
Q(booking__isnull=True) | Q(booking__cancellation_datetime__isnull=False), pk__in=events_to_delete
).delete()
# report events that weren't deleted because they have bookings
report, dummy = RecurrenceExceptionsReport.objects.get_or_create(agenda=self)
report.events.set(recurrences.filter(pk__in=events_to_delete))
def get_booking_form_url(self):
if not self.booking_form_url:
return
template_vars = Context(settings.TEMPLATE_VARS)
try:
return Template(self.booking_form_url).render(template_vars)
except (VariableDoesNotExist, TemplateSyntaxError):
return
def get_booking_check_filters(self):
if not self.booking_check_filters:
return []
return [x.strip() for x in self.booking_check_filters.split(',')]
def get_booking_user_block_template(self):
if self.kind == 'events':
default = '{{ booking.user_name|default:booking.label|default:"%s" }}' % _('Anonymous')
else:
default = """{%% if booking.label and booking.user_name %%}
{{ booking.label }} - {{ booking.user_name }}
{%% else %%}
{{ booking.user_name|default:booking.label|default:"%s" }}
{%% endif %%}""" % _(
'booked'
)
return self.booking_user_block_template or default
def get_recurrence_exceptions(self, min_start, max_start):
return TimePeriodException.objects.filter(
Q(desk__slug='_exceptions_holder', desk__agenda=self)
| Q(
unavailability_calendar__desks__slug='_exceptions_holder',
unavailability_calendar__desks__agenda=self,
),
start_datetime__lt=max_start,
end_datetime__gt=min_start,
)
def prefetch_desks_and_exceptions(self, min_date, max_date=None, with_sources=False):
if self.kind == 'meetings':
desks = self.desk_set.all()
elif self.kind == 'virtual':
desks = (
Desk.objects.filter(agenda__virtual_agendas=self)
.select_related('agenda')
.order_by('agenda', 'label')
)
else:
raise ValueError('does not work with kind %r' % self.kind)
past_date_time_periods = TimePeriod.objects.filter(desk=OuterRef('pk'), date__lt=min_date)
desks = desks.annotate(has_past_date_time_periods=Exists(past_date_time_periods))
time_period_queryset = TimePeriod.objects.filter(Q(date__isnull=True) | Q(date__gte=min_date))
if max_date:
time_period_queryset = time_period_queryset.filter(Q(date__isnull=True) | Q(date__lte=max_date))
self.prefetched_desks = desks.prefetch_related(
'unavailability_calendars', Prefetch('timeperiod_set', queryset=time_period_queryset)
)
if with_sources:
self.prefetched_desks = self.prefetched_desks.prefetch_related('timeperiodexceptionsource_set')
unavailability_calendar_ids = UnavailabilityCalendar.objects.filter(
desks__in=self.prefetched_desks
).values('pk')
all_desks_exceptions = TimePeriodException.objects.filter(
Q(desk__in=self.prefetched_desks) | Q(unavailability_calendar__in=unavailability_calendar_ids)
)
for desk in self.prefetched_desks:
uc_ids = [uc.pk for uc in desk.unavailability_calendars.all()]
desk.prefetched_exceptions = [
e
for e in all_desks_exceptions
if e.desk_id == desk.pk or e.unavailability_calendar_id in uc_ids
]
@staticmethod
def filter_for_guardian(qs, guardian_external_id, child_external_id, min_start=None, max_start=None):
agendas = SharedCustodyAgenda.objects.filter(child__user_external_id=child_external_id).order_by(
'date_start'
)
if max_start:
agendas = agendas.filter(date_start__lte=max_start)
if min_start:
agendas = agendas.filter(Q(date_end__isnull=True) | Q(date_end__gte=min_start))
if not agendas:
return qs
qs = (
qs.annotate(week=ExtractWeek('start_datetime'))
.annotate(week_number=Cast('week', models.IntegerField()))
.annotate(odd_week=F('week_number') % 2)
)
previous_date_end = None
filtered_qs = Event.objects.none()
for agenda in agendas:
filtered_qs |= Agenda.filter_for_custody_agenda(qs, agenda, guardian_external_id)
if not previous_date_end:
# first shared custody agenda, include all events before it begins
filtered_qs |= qs.filter(start_datetime__lt=agenda.date_start)
else:
# include all events between agendas
filtered_qs |= qs.filter(
start_datetime__lt=agenda.date_start, start_datetime__date__gt=previous_date_end
)
previous_date_end = agenda.date_end
if previous_date_end:
# last agenda has end date, include all events after it
filtered_qs |= qs.filter(start_datetime__gt=previous_date_end)
return filtered_qs
@staticmethod
def filter_for_custody_agenda(qs, agenda, guardian_external_id):
rules = (
SharedCustodyRule.objects.filter(
guardian__user_external_id=guardian_external_id,
agenda=agenda,
)
.annotate(day=Func(F('days'), function='unnest', output_field=models.IntegerField()))
.annotate(week_day=(F('day') + 1) % 7 + 1) # convert ISO day number to db lookup day number
.values('week_day')
)
rules_lookup = (
Q(start_datetime__week_day__in=rules.filter(weeks=''))
| Q(start_datetime__week_day__in=rules.filter(weeks='even'), odd_week=False)
| Q(start_datetime__week_day__in=rules.filter(weeks='odd'), odd_week=True)
)
all_periods = SharedCustodyPeriod.objects.filter(
agenda=agenda,
date_start__lte=OuterRef('start_datetime'),
date_end__gt=OuterRef('start_datetime'),
)
holiday_periods = all_periods.filter(holiday_rule__isnull=False)
exceptional_periods = all_periods.filter(holiday_rule__isnull=True)
qs = qs.annotate(
in_holiday_period=Exists(holiday_periods.filter(guardian__user_external_id=guardian_external_id)),
in_excluded_holiday_period=Exists(
holiday_periods.exclude(guardian__user_external_id=guardian_external_id)
),
in_exceptional_period=Exists(
exceptional_periods.filter(guardian__user_external_id=guardian_external_id)
),
in_excluded_exceptional_period=Exists(
exceptional_periods.exclude(guardian__user_external_id=guardian_external_id)
),
)
rules_lookup = (rules_lookup | Q(in_holiday_period=True)) & Q(in_excluded_holiday_period=False)
qs = qs.filter(
(rules_lookup | Q(in_exceptional_period=True)) & Q(in_excluded_exceptional_period=False),
start_datetime__gte=agenda.date_start,
)
if agenda.date_end:
qs = qs.filter(start_datetime__date__lte=agenda.date_end)
return qs
@staticmethod
def prefetch_recurring_events(
qs, with_overlaps=None, user_external_id=None, start_datetime=None, end_datetime=None
):
recurring_event_queryset = Event.objects.filter(
Q(publication_datetime__isnull=True) | Q(publication_datetime__lte=now()),
recurrence_days__isnull=False,
)
if with_overlaps:
recurring_event_queryset = Event.annotate_recurring_events_with_overlaps(
recurring_event_queryset, agendas=qs
)
recurring_event_queryset = Event.annotate_recurring_events_with_booking_overlaps(
recurring_event_queryset, with_overlaps, user_external_id, start_datetime, end_datetime
)
qs = qs.prefetch_related(
Prefetch(
'event_set',
queryset=recurring_event_queryset,
to_attr='prefetched_recurring_events',
)
)
return qs
@staticmethod
def prefetch_events(qs, user_external_id=None, guardian_external_id=None, annotate_for_user=True):
event_queryset = Event.objects.filter(
Q(publication_datetime__isnull=True) | Q(publication_datetime__lte=now()),
recurrence_days__isnull=True,
cancelled=False,
start_datetime__gte=localtime(now()),
).order_by()
if user_external_id and annotate_for_user:
event_queryset = Event.annotate_queryset_for_user(event_queryset, user_external_id)
if guardian_external_id and user_external_id:
event_queryset = Agenda.filter_for_guardian(
event_queryset, guardian_external_id, user_external_id
)
return qs.filter(kind='events').prefetch_related(
Prefetch(
'event_set',
queryset=event_queryset,
to_attr='prefetched_events',
),
)
def is_available_for_simple_management(self):
if self.kind != 'meetings':
return False
was_prefetched = False
if hasattr(self, 'prefetched_desks'):
desks = self.prefetched_desks
was_prefetched = True
else:
desks = self.desk_set.all()
if len(desks) < 2:
# no desk or just one, it's ok
return True
desk = desks[0]
def values_list(obj, qs_name, qs, fields, for_exception=False):
if not was_prefetched:
prefetched_qs = getattr(obj, qs).values_list(*fields)
if for_exception:
prefetched_qs = prefetched_qs.filter(source__isnull=True)
return prefetched_qs
values = []
if for_exception:
prefetched_qs = obj.prefetched_exceptions
else:
prefetched_qs = obj._prefetched_objects_cache.get(qs_name) # XXX django 1.11 compat
if prefetched_qs is None:
prefetched_qs = obj._prefetched_objects_cache.get(qs)
for inst in prefetched_qs:
# queryset is prefetched, fake values_list
if for_exception and inst.source_id is not None:
continue
values.append(tuple(getattr(inst, f) for f in fields))
return values
period_fields = ['weekday', 'start_time', 'end_time']
exception_fields = ['label', 'start_datetime', 'end_datetime']
source_fields = ['ics_filename', 'ics_url', 'settings_slug', 'enabled']
desk_time_periods = set(values_list(desk, 'timeperiod', 'timeperiod_set', period_fields))
desk_exceptions = set(
values_list(
desk, 'timeperiodexception', 'timeperiodexception_set', exception_fields, for_exception=True
)
)
desk_sources = set(
values_list(desk, 'timeperiodexceptionsource', 'timeperiodexceptionsource_set', source_fields)
)
desk_unavaibility_calendars = set(
values_list(desk, 'unavailability_calendars', 'unavailability_calendars', ['pk'])
)
for other_desk in desks[1:]:
# compare time periods
other_desk_time_periods = set(
values_list(other_desk, 'timeperiod', 'timeperiod_set', period_fields)
)
if desk_time_periods != other_desk_time_periods:
return False
# compare exceptions
other_desk_exceptions = set(
values_list(
other_desk,
'timeperiodexception',
'timeperiodexception_set',
exception_fields,
for_exception=True,
)
)
if desk_exceptions != other_desk_exceptions:
return False
# compare sources
other_desk_sources = set(
values_list(
other_desk, 'timeperiodexceptionsource', 'timeperiodexceptionsource_set', source_fields
)
)
if desk_sources != other_desk_sources:
return False
# compare unavailability calendars
other_desk_unavaibility_calendars = set(
values_list(other_desk, 'unavailability_calendars', 'unavailability_calendars', ['pk'])
)
if desk_unavaibility_calendars != other_desk_unavaibility_calendars:
return False
return True
class VirtualMember(models.Model):
"""Trough model to link virtual agendas to their real agendas.
Real agendas linked to a virtual agenda MUST all have the same list of
MeetingType based on their label, slug and duration. It's enforced by
VirtualMember.clean() and the realted management views.
"""
virtual_agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE, related_name='real_members')
real_agenda = models.ForeignKey(
Agenda, on_delete=models.CASCADE, related_name='virtual_members', verbose_name='Agenda'
)
class Meta:
unique_together = (('virtual_agenda', 'real_agenda'),)
def clean(self):
error_msg = [_('This agenda does not have the same meeting types provided by the virtual agenda.')]
virtual_meetingtypes = self.virtual_agenda.iter_meetingtypes(excluded_agenda=self.real_agenda)
if not virtual_meetingtypes:
return
virtual_meetingtypes = {(mt.label, mt.slug, mt.duration) for mt in virtual_meetingtypes}
real_meetingtypes = self.real_agenda.iter_meetingtypes()
real_meetingtypes = {(mt.label, mt.slug, mt.duration) for mt in real_meetingtypes}
if virtual_meetingtypes - real_meetingtypes:
# missing meeting type in real agenda
for mt in virtual_meetingtypes - real_meetingtypes:
error_msg += [
_(
'Meeting type "%(label)s" (%(duration)s minutes) '
'(identifier: %(slug)s) does no exist.'
)
% {'label': mt[0], 'slug': mt[1], 'duration': mt[2]}
]
raise ValidationError(error_msg)
if real_meetingtypes - virtual_meetingtypes:
# extra meeting type in real agenda
for mt in real_meetingtypes - virtual_meetingtypes:
error_msg += ['Extra meeting type, "%s".' % mt[0]]
raise ValidationError(error_msg)
WEEKDAYS_LIST = sorted(WEEKDAYS.items(), key=lambda x: x[0])
class WeekTime(collections.namedtuple('WeekTime', ['weekday', 'weekday_indexes', 'date', 'time'])):
"""Representation of a time point in a weekday, ex.: Monday at 5 o'clock."""
def __new__(cls, weekday, weekday_indexes, date, time):
if date:
weekday = date.weekday()
return super().__new__(cls, weekday, weekday_indexes, date, time)
def __repr__(self):
return '%s / %s' % (
self.date or force_str(WEEKDAYS[self.weekday]),
date_format(self.time, 'TIME_FORMAT'),
)
def keep_only_weekday_and_time(self):
return WeekTime(weekday=self.weekday, time=self.time, date=None, weekday_indexes=None)
WEEK_CHOICES = [
(1, _('First of the month')),
(2, _('Second of the month')),
(3, _('Third of the month')),
(4, _('Fourth of the month')),
(5, _('Fifth of the month')),
]
class TimePeriod(models.Model):
weekday = models.IntegerField(_('Week day'), choices=WEEKDAYS_LIST, null=True)
weekday_indexes = ArrayField(
models.IntegerField(choices=WEEK_CHOICES),
verbose_name=_('Repeat'),
blank=True,
null=True,
)
date = models.DateField(_('Date'), null=True)
start_time = models.TimeField(_('Start'))
end_time = models.TimeField(_('End'))
desk = models.ForeignKey('Desk', on_delete=models.CASCADE, null=True)
agenda = models.ForeignKey(
Agenda, on_delete=models.CASCADE, null=True, related_name='excluded_timeperiods'
)
class Meta:
ordering = ['weekday', 'date', 'start_time']
constraints = [
models.CheckConstraint(
check=Q(date__isnull=True, weekday__isnull=False)
| Q(date__isnull=False, weekday__isnull=True),
name='date_xor_weekday',
)
]
def __str__(self):
if self.date:
label = date_format(self.date, 'l d F Y')
else:
label = force_str(WEEKDAYS[self.weekday])
if self.weekday_indexes:
label = _('%(weekday)s (%(ordinals)s of the month)') % {
'weekday': label,
'ordinals': ', '.join(ordinal(i) for i in self.weekday_indexes),
}
label = '%s / %s%s' % (
label,
date_format(self.start_time, 'TIME_FORMAT'),
date_format(self.end_time, 'TIME_FORMAT'),
)
return mark_safe(label)
def save(self, *args, **kwargs):
if self.agenda:
assert self.agenda.kind == 'virtual', "a time period can only reference a virtual agenda"
super().save(*args, **kwargs)
@property
def weekday_str(self):
return WEEKDAYS[self.weekday]
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
cls.objects.update_or_create(defaults=data, **data)
def export_json(self):
return {
'weekday': self.weekday,
'weekday_indexes': self.weekday_indexes,
'date': self.date.strftime('%Y-%m-%d') if self.date else None,
'start_time': self.start_time.strftime('%H:%M'),
'end_time': self.end_time.strftime('%H:%M'),
}
def duplicate(self, desk_target=None, agenda_target=None):
# clone current period
new_period = copy.deepcopy(self)
new_period.pk = None
# set desk
new_period.desk = desk_target or self.desk
# set agenda
new_period.agenda = agenda_target or self.agenda
# store new period
new_period.save()
return new_period
def as_weektime_interval(self):
return Interval(
WeekTime(self.weekday, self.weekday_indexes, self.date, self.start_time),
WeekTime(self.weekday, self.weekday_indexes, self.date, self.end_time),
)
def as_shared_timeperiods(self):
return SharedTimePeriod(
weekday=self.weekday,
weekday_indexes=self.weekday_indexes,
start_time=self.start_time,
end_time=self.end_time,
date=self.date,
desks=[self.desk],
)
@functools.total_ordering
class SharedTimePeriod:
"""
Hold common timeperiod for multiple desks.
To improve performance when generating meetings slots for virtual
agendas or agendas with many desks, we deduplicate time-periods between
all desks of all agendas.
Deduplication is based on a common key, and implemented through __eq__
and __lt__ which will be used by itertools.groupby().
(weekday, start_datetime, end_datetime)
it's done in the deduplicate() classmethod.
At the level of gel_all_slots() timeperiod are re-duplicated if the
min_datetime,max_datetime of the desk's agendas differs (see the code
of get_all_slots() for details).
"""
__slots__ = ['weekday', 'weekday_indexes', 'start_time', 'end_time', 'date', 'desks']
def __init__(self, weekday, weekday_indexes, start_time, end_time, date, desks):
self.weekday = weekday
self.weekday_indexes = weekday_indexes
self.start_time = start_time
self.end_time = end_time
self.date = date
self.desks = set(desks)
def __str__(self):
return '%s / %s%s' % (
force_str(WEEKDAYS[self.weekday]),
date_format(self.start_time, 'TIME_FORMAT'),
date_format(self.end_time, 'TIME_FORMAT'),
)
def __eq__(self, other):
return (self.weekday, self.start_time, self.end_time, self.date) == (
other.weekday,
other.start_time,
other.end_time,
other.date,
)
def __lt__(self, other):
return (self.weekday, self.start_time, self.end_time, self.date) < (
other.weekday,
other.start_time,
other.end_time,
other.date,
)
def get_time_slots(self, min_datetime, max_datetime, meeting_duration, base_duration):
"""Generate all possible time slots between min_datetime and max_datime
of duration meeting_duration minutes and spaced by base_duration
minutes, i.e.
compute a list [a,b] -> [c,d] -> ...
where b-a = meeting_duration and c-a = base_duration.
We start with the first time following min_datetime and being on
the same weekday of the current period.
Then we iterate, advancing by base_duration minutes each time.
If we cross the end_time of the period or end of the current_day
(means end_time is midnight), it advance time to self.start_time on
the next week (same weekday, same start, one week in the future).
When it crosses end_datetime it stops.
Generated start_datetime MUST be in the local timezone, and the local
timezone must not change, as the API needs it to generate stable ids.
"""
if self.date and not (min_datetime.date() <= self.date <= max_datetime.date()):
return
meeting_duration = datetime.timedelta(minutes=meeting_duration)
duration = datetime.timedelta(minutes=base_duration)
if not self.date:
real_min_datetime = min_datetime + datetime.timedelta(days=self.weekday - min_datetime.weekday())
if real_min_datetime < min_datetime:
real_min_datetime += datetime.timedelta(days=7)
else:
real_min_datetime = make_aware(
datetime.datetime(day=self.date.day, month=self.date.month, year=self.date.year)
)
# make sure datetime in local timezone, it's ABSOLUTELY necessary
# to have stable event ids in the API.
real_min_datetime = real_min_datetime.replace(
hour=12
) # so aware datetime will be int the dst of the day
event_datetime = make_aware(make_naive(real_min_datetime)).replace(
hour=self.start_time.hour, minute=self.start_time.minute, second=0, microsecond=0
)
# don't start before min_datetime
event_datetime = max(event_datetime, min_datetime)
# get slots
while event_datetime < max_datetime:
end_time = event_datetime + meeting_duration
next_time = event_datetime + duration
if (
end_time.time() > self.end_time
or event_datetime.date() != next_time.date()
or (self.weekday_indexes and get_weekday_index(event_datetime) not in self.weekday_indexes)
):
# if time slot is not repeating, end now
if self.date:
break
# switch to naive time for day/week changes
event_datetime = make_naive(event_datetime)
# back to morning
event_datetime = event_datetime.replace(
hour=self.start_time.hour, minute=self.start_time.minute
)
# but next week
event_datetime += datetime.timedelta(days=7)
# and re-align to timezone afterwards
event_datetime = make_aware(event_datetime)
continue
# don't end after max_datetime
if event_datetime > max_datetime:
break
yield event_datetime
event_datetime = next_time
@classmethod
def from_weektime_interval(cls, weektime_interval, desks=(), date=None, weekday_indexes=None):
begin, end = weektime_interval
assert begin.weekday == end.weekday
return cls(
weekday=begin.weekday,
weekday_indexes=begin.weekday_indexes or end.weekday_indexes or weekday_indexes,
start_time=begin.time,
end_time=end.time,
date=begin.date or end.date or date,
desks=desks,
)
class MeetingType(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
duration = models.IntegerField(_('Duration (in minutes)'), default=30, validators=[MinValueValidator(1)])
deleted = models.BooleanField(_('Deleted'), default=False)
class Meta:
ordering = ['duration', 'label']
unique_together = ['agenda', 'slug']
def save(self, *args, **kwargs):
assert self.agenda.kind != 'virtual', "a meetingtype can't reference a virtual agenda"
if not self.slug:
self.slug = generate_slug(self, agenda=self.agenda)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
cls.objects.update_or_create(slug=data['slug'], agenda=data['agenda'], defaults=data)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'duration': self.duration,
}
def duplicate(self, agenda_target=None):
new_meeting_type = copy.deepcopy(self)
new_meeting_type.pk = None
if agenda_target:
new_meeting_type.agenda = agenda_target
else:
new_meeting_type.slug = None
new_meeting_type.save()
return new_meeting_type
class Event(models.Model):
id = models.BigAutoField(primary_key=True)
INTERVAL_CHOICES = [
(1, _('Every week')),
(2, _('Every two weeks')),
(3, _('Every three weeks')),
]
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
start_datetime = models.DateTimeField(_('Date/time'))
recurrence_days = ArrayField(
models.IntegerField(choices=WEEKDAY_CHOICES),
verbose_name=_('Recurrence days'),
blank=True,
null=True,
)
recurrence_week_interval = models.IntegerField(_('Repeat'), choices=INTERVAL_CHOICES, default=1)
recurrence_end_date = models.DateField(
_('Recurrence end date'),
null=True,
blank=True,
help_text=_('If left blank, a one-year maximal booking delay will be applied for this event.'),
)
primary_event = models.ForeignKey('self', null=True, on_delete=models.CASCADE, related_name='recurrences')
duration = models.PositiveIntegerField(_('Duration (in minutes)'), default=None, null=True, blank=True)
publication_datetime = models.DateTimeField(_('Publication date/time'), blank=True, null=True)
places = models.PositiveIntegerField(_('Places'))
waiting_list_places = models.PositiveIntegerField(_('Places in waiting list'), default=0)
label = models.CharField(
_('Label'),
max_length=150,
null=True,
blank=True,
help_text=_('Optional label to identify this date.'),
)
slug = models.SlugField(_('Identifier'), max_length=160, blank=True, validators=[validate_not_digit])
description = models.TextField(
_('Description'), null=True, blank=True, help_text=_('Optional event description.')
)
pricing = models.CharField(_('Pricing'), max_length=150, null=True, blank=True)
url = models.CharField(_('URL'), max_length=200, null=True, blank=True)
booked_places = models.PositiveSmallIntegerField(default=0)
booked_waiting_list_places = models.PositiveSmallIntegerField(default=0)
almost_full = models.BooleanField(default=False)
full = models.BooleanField(default=False)
cancelled = models.BooleanField(default=False)
cancellation_scheduled = models.BooleanField(default=False)
checked = models.BooleanField(default=False)
meeting_type = models.ForeignKey(MeetingType, null=True, on_delete=models.CASCADE)
desk = models.ForeignKey('Desk', null=True, on_delete=models.CASCADE)
resources = models.ManyToManyField('Resource')
custom_fields = models.JSONField(blank=True, default=dict)
almost_full_notification_timestamp = models.DateTimeField(null=True, blank=True)
full_notification_timestamp = models.DateTimeField(null=True, blank=True)
cancelled_notification_timestamp = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['agenda', 'start_datetime', 'duration', 'label']
unique_together = ('agenda', 'slug')
def __str__(self):
if self.label:
return self.label
return date_format(localtime(self.start_datetime), format='DATETIME_FORMAT')
@functional.cached_property
def cancellation_status(self):
if self.cancelled:
return _('Cancelled')
if self.cancellation_scheduled:
return _('Cancellation in progress')
def save(self, seen_slugs=None, *args, **kwargs):
assert self.agenda.kind != 'virtual', "an event can't reference a virtual agenda"
assert not (self.slug and self.slug.isdigit()), 'slug cannot be a number'
self.start_datetime = self.start_datetime.replace(second=0, microsecond=0)
if not self.slug:
self.slug = generate_slug(self, seen_slugs=seen_slugs, agenda=self.agenda)
return super().save(*args, **kwargs)
@contextmanager
def update_recurrences(self, changed_data, cleaned_data, protected_fields, exclude_fields):
with transaction.atomic():
if any(field for field in changed_data if field in protected_fields):
self.recurrences.all().delete()
elif self.recurrence_days:
update_fields = {
field: value for field, value in cleaned_data.items() if field not in exclude_fields
}
self.recurrences.update(**update_fields)
yield
if self.recurrence_days:
if self.recurrence_end_date:
self.recurrences.filter(start_datetime__gt=self.recurrence_end_date).delete()
self.create_all_recurrences()
@property
def base_slug(self):
# label can be empty
return slugify(self.label or ('%s-event' % self.agenda.label))
def main_list_full(self):
return bool(self.booked_places >= self.places)
def set_is_checked(self):
if not self.agenda.mark_event_checked_auto:
return
if self.checked:
return
booking_qs = self.booking_set.filter(
cancellation_datetime__isnull=True,
in_waiting_list=False,
user_was_present__isnull=True,
)
if booking_qs.exists():
return
self.checked = True
self.save(update_fields=['checked'])
def in_bookable_period(self, bypass_delays=False):
if self.publication_datetime and now() < self.publication_datetime:
return False
if (
not bypass_delays
and self.agenda.maximal_booking_delay
and self.start_datetime > self.agenda.max_booking_datetime
):
return False
if self.recurrence_days is not None:
# bookable recurrences probably exist
return True
if (
not bypass_delays
and self.agenda.minimal_booking_delay
and self.start_datetime < self.agenda.min_booking_datetime
):
return False
if self.start_datetime < now():
return False
return True
def is_day_past(self):
return self.start_datetime.date() <= now().date()
@staticmethod
def annotate_queryset_for_user(qs, user_external_id, with_status=False):
qs = qs.annotate(
user_places_count=Count(
'booking',
filter=Q(
booking__cancellation_datetime__isnull=True,
booking__in_waiting_list=False,
booking__user_external_id=user_external_id,
),
),
user_waiting_places_count=Count(
'booking',
filter=Q(
booking__cancellation_datetime__isnull=True,
booking__in_waiting_list=True,
booking__user_external_id=user_external_id,
),
),
)
if with_status:
qs = qs.annotate(
user_absence_count=Count(
'booking',
filter=Q(
booking__cancellation_datetime__isnull=True,
booking__user_was_present=False,
booking__user_external_id=user_external_id,
),
),
user_cancelled_count=Count(
'booking',
filter=Q(
booking__cancellation_datetime__isnull=False,
booking__user_external_id=user_external_id,
),
),
)
return qs
@staticmethod
def annotate_queryset_with_overlaps(qs, other_events=None):
if not other_events:
other_events = qs
common_annotations = {
'computed_end_datetime': ExpressionWrapper(
F('start_datetime') + datetime.timedelta(minutes=1) * F('duration'),
output_field=models.DateTimeField(),
),
'computed_slug': Concat('agenda__slug', Value('@'), 'slug', output_field=models.CharField()),
}
qs = qs.annotate(**common_annotations)
other_events = other_events.annotate(**common_annotations)
overlapping_events = other_events.filter(
start_datetime__lt=OuterRef('computed_end_datetime'),
computed_end_datetime__gt=OuterRef('start_datetime'),
).exclude(pk=OuterRef('pk'))
return qs.annotate(
overlaps=ArraySubquery(
overlapping_events.values('computed_slug'),
output_field=ArrayField(models.CharField()),
),
has_overlap=Exists(overlapping_events),
)
@staticmethod
def annotate_recurring_events_with_overlaps(qs, agendas=None):
qs = qs.annotate(
start_hour=Cast('start_datetime', models.TimeField()),
computed_end_datetime=ExpressionWrapper(
F('start_datetime') + datetime.timedelta(minutes=1) * F('duration'),
output_field=models.DateTimeField(),
),
end_hour=Cast('computed_end_datetime', models.TimeField()),
computed_slug=Concat('agenda__slug', Value('@'), 'slug', output_field=models.CharField()),
)
overlapping_events = qs.filter(
start_hour__lt=OuterRef('end_hour'),
end_hour__gt=OuterRef('start_hour'),
recurrence_days__overlap=F('recurrence_days'),
).exclude(pk=OuterRef('pk'))
if agendas:
overlapping_events = overlapping_events.filter(agenda__in=agendas)
json_object = JSONObject(
slug=F('computed_slug'),
days=F('recurrence_days'),
)
return qs.annotate(
overlaps=ArraySubquery(
overlapping_events.values(json=json_object),
output_field=ArrayField(models.JSONField()),
)
)
@staticmethod
def annotate_recurring_events_with_booking_overlaps(
qs, agenda_slugs, user_external_id, start_datetime, end_datetime
):
recurrences = Event.objects.filter(primary_event=OuterRef('pk'))
recurrences = recurrences.annotate(
dj_weekday=ExtractWeekDay('start_datetime'),
dj_weekday_int=Cast('dj_weekday', models.IntegerField()),
weekday=(F('dj_weekday_int') - 2) % 7,
)
recurrences_with_overlaps = Event.annotate_queryset_with_booked_event_overlaps(
recurrences, agenda_slugs, user_external_id, start_datetime, end_datetime
).filter(has_overlap=True)
return qs.annotate(
days_with_booking_overlaps=ArraySubquery(
recurrences_with_overlaps.values('weekday'), output_field=ArrayField(models.IntegerField())
)
)
@staticmethod
def annotate_queryset_with_booked_event_overlaps(
qs, agenda_slugs, user_external_id, start_datetime, end_datetime, exclude_events=None
):
booked_events = Event.objects.filter(
agenda__slug__in=agenda_slugs,
start_datetime__gte=start_datetime,
booking__user_external_id=user_external_id,
booking__cancellation_datetime__isnull=True,
)
if end_datetime:
booked_events = booked_events.filter(start_datetime__lte=end_datetime)
if exclude_events:
booked_events = booked_events.exclude(pk__in=exclude_events)
return Event.annotate_queryset_with_overlaps(qs, booked_events)
@staticmethod
def annotate_booking_checks(qs):
bookings = (
Booking.objects.filter(
event=OuterRef('pk'), cancellation_datetime__isnull=True, in_waiting_list=False
)
.order_by()
.values('event')
)
present_count = bookings.filter(user_was_present=True).annotate(count=Count('event')).values('count')
absent_count = bookings.filter(user_was_present=False).annotate(count=Count('event')).values('count')
notchecked_count = (
bookings.filter(user_was_present__isnull=True).annotate(count=Count('event')).values('count')
)
return qs.annotate(
present_count=Coalesce(Subquery(present_count, output_field=IntegerField()), Value(0)),
absent_count=Coalesce(Subquery(absent_count, output_field=IntegerField()), Value(0)),
notchecked_count=Coalesce(Subquery(notchecked_count, output_field=IntegerField()), Value(0)),
)
@property
def remaining_places(self):
return max(0, self.places - self.booked_places)
@property
def remaining_waiting_list_places(self):
return max(0, self.waiting_list_places - self.booked_waiting_list_places)
@property
def end_datetime(self):
if self.meeting_type:
minutes = self.meeting_type.duration
else:
minutes = self.duration
if minutes is None:
return None
return self.start_datetime + datetime.timedelta(minutes=minutes)
def get_absolute_url(self):
return reverse('chrono-manager-event-edit', kwargs={'pk': self.agenda.id, 'event_pk': self.id})
def get_absolute_view_url(self):
return reverse('chrono-manager-event-view', kwargs={'pk': self.agenda.id, 'event_pk': self.id})
def get_booking_form_url(self):
if not self.agenda.booking_form_url:
return
template_vars = Context(settings.TEMPLATE_VARS)
try:
url = Template(self.agenda.booking_form_url).render(template_vars)
url += '&' if '?' in url else '?'
url += 'agenda=%s&event=%s' % (self.agenda.slug, self.slug)
return mark_safe(url)
except (VariableDoesNotExist, TemplateSyntaxError):
return
@classmethod
def import_json(cls, data):
try:
data['start_datetime'] = make_aware(
datetime.datetime.strptime(data['start_datetime'], '%Y-%m-%d %H:%M:%S')
)
except ValueError:
raise AgendaImportError(_('Bad datetime format "%s"') % data['start_datetime'])
data = clean_import_data(cls, data)
if data.get('slug'):
event, dummy = cls.objects.update_or_create(
agenda=data['agenda'], slug=data['slug'], defaults=data
)
else:
event = cls(**data)
event.save()
if event.recurrence_days:
event.refresh_from_db()
if event.recurrence_end_date:
event.recurrences.filter(start_datetime__gt=event.recurrence_end_date).delete()
update_fields = {
field: getattr(event, field)
for field in [
'label',
'duration',
'publication_datetime',
'places',
'waiting_list_places',
'description',
'pricing',
'url',
]
}
event.recurrences.update(**update_fields)
event.create_all_recurrences()
def export_json(self):
recurrence_end_date = (
self.recurrence_end_date.strftime('%Y-%m-%d') if self.recurrence_end_date else None
)
return {
'start_datetime': make_naive(self.start_datetime).strftime('%Y-%m-%d %H:%M:%S'),
'publication_datetime': make_naive(self.publication_datetime).strftime('%Y-%m-%d %H:%M:%S')
if self.publication_datetime
else None,
'recurrence_days': self.recurrence_days,
'recurrence_week_interval': self.recurrence_week_interval,
'recurrence_end_date': recurrence_end_date,
'places': self.places,
'waiting_list_places': self.waiting_list_places,
'label': self.label,
'slug': self.slug,
'description': self.description,
'url': self.url,
'pricing': self.pricing,
'duration': self.duration,
}
def duplicate(self, agenda_target=None, primary_event=None, label=None, start_datetime=None):
new_event = copy.deepcopy(self)
new_event.pk = None
if label:
new_event.label = label
if start_datetime:
new_event.start_datetime = start_datetime
if agenda_target:
new_event.agenda = agenda_target
else:
new_event.slug = None
if primary_event:
new_event.primary_event = primary_event
new_event.save()
return new_event
def cancel(self, cancel_bookings=True):
bookings_to_cancel = self.booking_set.filter(cancellation_datetime__isnull=True).all()
if cancel_bookings and bookings_to_cancel.exclude(cancel_callback_url='').exists():
# booking cancellation needs network calls, schedule it for later
self.cancellation_scheduled = True
self.save()
else:
with transaction.atomic():
for booking in bookings_to_cancel:
booking.cancel()
self.cancelled = True
self.save()
def get_recurrences(self, min_datetime, max_datetime, exceptions=None):
recurrences = []
rrule_set = rruleset()
if exceptions is None:
exceptions = self.agenda.get_recurrence_exceptions(min_datetime, max_datetime)
for exception in exceptions:
exception_start = localtime(exception.start_datetime)
event_start = localtime(self.start_datetime)
if event_start.time() < exception_start.time():
exception_start += datetime.timedelta(days=1)
exception_start = exception_start.replace(
hour=event_start.hour, minute=event_start.minute, second=0, microsecond=0
)
rrule_set.exrule(
rrule(
freq=DAILY,
dtstart=make_naive(exception_start),
until=make_naive(exception.end_datetime),
)
)
event_base = Event(
agenda=self.agenda,
primary_event=self,
slug=self.slug,
duration=self.duration,
places=self.places,
waiting_list_places=self.waiting_list_places,
publication_datetime=self.publication_datetime,
label=self.label,
description=self.description,
pricing=self.pricing,
url=self.url,
custom_fields=self.custom_fields,
)
# remove pytz info because dateutil doesn't support DST changes
min_datetime = make_naive(min_datetime)
max_datetime = make_naive(max_datetime)
rrule_set.rrule(rrule(dtstart=make_naive(self.start_datetime), **self.recurrence_rule))
for start_datetime in rrule_set.between(min_datetime, max_datetime, inc=True):
event = copy.copy(event_base)
# add timezone back
aware_start_datetime = make_aware(start_datetime)
event.slug = '%s--%s' % (
event.slug,
aware_start_datetime.strftime('%Y-%m-%d-%H%M'),
)
event.start_datetime = aware_start_datetime.astimezone(utc)
recurrences.append(event)
return recurrences
def get_recurrence_display(self):
time = date_format(localtime(self.start_datetime), 'TIME_FORMAT')
days_count = len(self.recurrence_days)
if days_count == 7:
repeat = _('Daily')
elif days_count > 1 and (self.recurrence_days[-1] - self.recurrence_days[0]) == days_count - 1:
# days are contiguous
repeat = _('From %(weekday)s to %(last_weekday)s') % {
'weekday': str(WEEKDAYS[self.recurrence_days[0]]),
'last_weekday': str(WEEKDAYS[self.recurrence_days[-1]]),
}
else:
repeat = _('On %(weekdays)s') % {
'weekdays': ', '.join([str(WEEKDAYS_PLURAL[i]) for i in self.recurrence_days])
}
recurrence_display = _('%(On_day_x)s at %(time)s') % {'On_day_x': repeat, 'time': time}
if self.recurrence_week_interval > 1:
if self.recurrence_week_interval == 2:
every_x_weeks = _('every two weeks')
elif self.recurrence_week_interval == 3:
every_x_weeks = _('every three weeks')
recurrence_display = _('%(Every_x_days)s, once %(every_x_weeks)s') % {
'Every_x_days': recurrence_display,
'every_x_weeks': every_x_weeks,
}
if self.start_datetime > now():
start_date = date_format(self.start_datetime, 'DATE_FORMAT')
recurrence_display = _('%(Every_x_days)s, from %(date)s') % {
'Every_x_days': recurrence_display,
'date': start_date,
}
if self.recurrence_end_date:
end_date = date_format(self.recurrence_end_date, 'DATE_FORMAT')
recurrence_display = _('%(Every_x_days)s, until %(date)s') % {
'Every_x_days': recurrence_display,
'date': end_date,
}
return recurrence_display
@property
def recurrence_rule(self):
recurrence_rule = {
'freq': WEEKLY,
'byweekday': self.recurrence_days,
'interval': self.recurrence_week_interval,
}
if self.recurrence_end_date:
recurrence_rule['until'] = datetime.datetime.combine(
self.recurrence_end_date, datetime.time(0, 0)
)
else:
recurrence_rule['until'] = make_naive(now() + datetime.timedelta(days=365))
return recurrence_rule
def has_recurrences_booked(self, after=None):
return Booking.objects.filter(
event__primary_event=self,
event__start_datetime__gt=after or now(),
cancellation_datetime__isnull=True,
).exists()
def create_all_recurrences(self):
Event.create_events_recurrences([self])
@classmethod
def create_events_recurrences(cls, events):
for event in events:
if event.recurrence_end_date:
max_datetime = datetime.datetime.combine(event.recurrence_end_date, datetime.time(0, 0))
else:
max_datetime = make_naive(now() + datetime.timedelta(days=365))
existing_recurrences = event.recurrences.values_list('slug', flat=True)
all_recurrences = event.get_recurrences(localtime(event.start_datetime), make_aware(max_datetime))
recurrences_to_create = [r for r in all_recurrences if r.slug not in existing_recurrences]
if recurrences_to_create:
Event.objects.bulk_create(recurrences_to_create, ignore_conflicts=True)
@property
def datetime_slug(self):
assert self.primary_event_id is not None, 'only for event recurrence'
datetime_part = self.slug.rsplit('--')[-1]
return datetime.datetime.strptime(datetime_part, '%Y-%m-%d-%H%M')
def get_custom_fields(self):
if not self.agenda.events_type:
return {}
custom_fields = {}
for custom_field in self.agenda.events_type.get_custom_fields():
custom_fields[custom_field['varname']] = self.custom_fields.get(custom_field['varname'])
return custom_fields
class EventsType(models.Model):
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
label = models.CharField(_('Label'), max_length=150)
custom_fields = models.JSONField(blank=True, default=list)
def __str__(self):
return self.label
class Meta:
ordering = ['label']
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
def get_custom_fields(self):
custom_fields = []
if not isinstance(self.custom_fields, list):
return custom_fields
for values in self.custom_fields:
if not isinstance(values, dict):
continue
complete = True
for k in ['varname', 'label', 'field_type']:
if not values.get(k):
complete = False
break
if complete:
custom_fields.append(values)
return custom_fields
@classmethod
def import_json(cls, data, overwrite=False):
data = clean_import_data(cls, data)
slug = data.pop('slug')
events_type, created = cls.objects.update_or_create(slug=slug, defaults=data)
return created, events_type
def export_json(self):
return {
'slug': self.slug,
'label': self.label,
'custom_fields': self.custom_fields,
}
class BookingColor(models.Model):
COLOR_COUNT = 8
label = models.CharField(_('Label'), max_length=250)
index = models.PositiveSmallIntegerField()
class Meta:
unique_together = ('label',)
ordering = ('pk',)
def save(self, *args, **kwargs):
if self.index is None:
last_color = BookingColor.objects.last() or BookingColor(index=-1)
self.index = (last_color.index + 1) % self.COLOR_COUNT
super().save(*args, **kwargs)
def __str__(self):
return '%s' % self.label
class Booking(models.Model):
event = models.ForeignKey(Event, on_delete=models.CASCADE)
extra_data = models.JSONField(null=True)
anonymization_datetime = models.DateTimeField(null=True)
cancellation_datetime = models.DateTimeField(null=True)
email_reminder_datetime = models.DateTimeField(null=True)
sms_reminder_datetime = models.DateTimeField(null=True)
in_waiting_list = models.BooleanField(default=False)
creation_datetime = models.DateTimeField(auto_now_add=True)
# primary booking is used to group multiple bookings together
primary_booking = models.ForeignKey(
'self', null=True, on_delete=models.CASCADE, related_name='secondary_booking_set'
)
label = models.CharField(max_length=250, blank=True)
user_display_label = models.CharField(
verbose_name=_('Label displayed to user'), max_length=250, blank=True
)
user_external_id = models.CharField(max_length=250, blank=True)
user_last_name = models.CharField(max_length=250, blank=True)
user_first_name = models.CharField(max_length=250, blank=True)
user_email = models.EmailField(blank=True)
user_phone_number = models.CharField(max_length=30, blank=True)
user_was_present = models.BooleanField(null=True)
user_check_type_slug = models.CharField(max_length=160, blank=True, null=True)
user_check_type_label = models.CharField(max_length=150, blank=True, null=True)
out_of_min_delay = models.BooleanField(default=False)
extra_emails = ArrayField(models.EmailField(), default=list)
extra_phone_numbers = ArrayField(models.CharField(max_length=16), default=list)
form_url = models.URLField(blank=True)
backoffice_url = models.URLField(blank=True)
cancel_callback_url = models.URLField(blank=True)
color = models.ForeignKey(BookingColor, null=True, on_delete=models.SET_NULL, related_name='bookings')
@property
def user_name(self):
return ('%s %s' % (self.user_first_name, self.user_last_name)).strip()
@cached_property
def emails(self):
emails = set(self.extra_emails)
if self.user_email:
emails.add(self.user_email)
return list(emails)
@cached_property
def phone_numbers(self):
phone_numbers = set(self.extra_phone_numbers)
if self.user_phone_number:
phone_numbers.add(self.user_phone_number)
return list(phone_numbers)
def cancel(self, trigger_callback=False):
timestamp = now()
with transaction.atomic():
self.secondary_booking_set.update(cancellation_datetime=timestamp)
self.cancellation_datetime = timestamp
self.save()
if self.cancel_callback_url and trigger_callback:
r = requests_wrapper.post(self.cancel_callback_url, remote_service='auto', timeout=15)
r.raise_for_status()
def accept(self):
self.in_waiting_list = False
with transaction.atomic():
self.secondary_booking_set.update(in_waiting_list=False)
self.save()
def suspend(self):
self.in_waiting_list = True
with transaction.atomic():
self.secondary_booking_set.update(in_waiting_list=True)
self.save()
def reset_user_was_present(self):
self.user_check_type_slug = None
self.user_check_type_label = None
self.user_was_present = None
with transaction.atomic():
self.secondary_booking_set.update(user_check_type_slug=None)
self.secondary_booking_set.update(user_check_type_label=None)
self.secondary_booking_set.update(user_was_present=None)
self.save()
self.event.checked = False
self.event.save(update_fields=['checked'])
def mark_user_absence(self, check_type_slug=None, check_type_label=None):
self.user_check_type_slug = check_type_slug
self.user_check_type_label = check_type_label
self.user_was_present = False
self.cancellation_datetime = None
with transaction.atomic():
self.secondary_booking_set.update(user_check_type_slug=check_type_slug)
self.secondary_booking_set.update(user_check_type_label=check_type_label)
self.secondary_booking_set.update(user_was_present=False)
self.secondary_booking_set.update(cancellation_datetime=None)
self.save()
self.event.set_is_checked()
def mark_user_presence(self, check_type_slug=None, check_type_label=None):
self.user_check_type_slug = check_type_slug
self.user_check_type_label = check_type_label
self.user_was_present = True
self.cancellation_datetime = None
with transaction.atomic():
self.secondary_booking_set.update(user_check_type_slug=check_type_slug)
self.secondary_booking_set.update(user_check_type_label=check_type_label)
self.secondary_booking_set.update(user_was_present=True)
self.secondary_booking_set.update(cancellation_datetime=None)
self.save()
self.event.set_is_checked()
def get_user_block(self):
template_vars = Context(settings.TEMPLATE_VARS, autoescape=False)
template_vars.update(
{
'booking': self,
}
)
try:
return escape(Template(self.event.agenda.get_booking_user_block_template()).render(template_vars))
except (VariableDoesNotExist, TemplateSyntaxError):
return
def get_extra_user_block(self, request):
context = RequestContext(request)
context.update(
{
'booking': self,
}
)
try:
return Template(self.event.agenda.booking_extra_user_block_template).render(context)
except (VariableDoesNotExist, TemplateSyntaxError):
return
@classmethod
def anonymize_bookings(cls, bookings_queryset):
bookings_queryset.update(
label='',
user_display_label='',
user_external_id='',
user_last_name='',
user_first_name='',
extra_data={},
anonymization_datetime=now(),
)
def get_ics(self, request=None):
ics = vobject.iCalendar()
ics.add('prodid').value = '-//Entr\'ouvert//NON SGML Publik'
vevent = vobject.newFromBehavior('vevent')
vevent.add('uid').value = '%s-%s-%s' % (
self.event.start_datetime.isoformat(),
self.event.agenda.pk,
self.pk,
)
vevent.add('summary').value = self.user_display_label or self.label
vevent.add('dtstart').value = self.event.start_datetime
if self.user_name:
vevent.add('attendee').value = self.user_name
if request is None or request.GET.get('organizer') != 'no':
organizer_name = getattr(settings, 'TEMPLATE_VARS', {}).get('global_title', 'chrono')
organizer_email = getattr(settings, 'TEMPLATE_VARS', {}).get(
'default_from_email', 'chrono@example.net'
)
organizer = vevent.add('organizer')
organizer.value = f'mailto:{organizer_email}'
organizer.cn_param = organizer_name
if self.event.end_datetime:
vevent.add('dtend').value = self.event.end_datetime
for field in ('description', 'location', 'comment', 'url'):
field_value = request and request.GET.get(field) or (self.extra_data or {}).get(field)
if field_value:
vevent.add(field).value = field_value
ics.add(vevent)
return ics.serialize()
def clone(self, primary_booking=None, save=True):
new_booking = copy.deepcopy(self)
new_booking.id = None
new_booking.primary_booking = primary_booking
if save:
new_booking.save()
return new_booking
def events_display(self):
name = self.user_name or self.label or _('Anonymous')
return '%s, %s' % (name, date_format(localtime(self.creation_datetime), 'DATETIME_FORMAT'))
def get_form_url(self):
return translate_from_publik_url(self.form_url)
def get_backoffice_url(self):
return translate_from_publik_url(self.backoffice_url)
OpeningHour = collections.namedtuple('OpeningHour', ['begin', 'end'])
class Desk(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
def __str__(self):
return self.label
class Meta:
ordering = ['label', 'slug']
unique_together = ['agenda', 'slug']
def save(self, *args, **kwargs):
assert self.agenda.kind != 'virtual', "a desk can't reference a virtual agenda"
if not self.slug:
self.slug = generate_slug(self, agenda=self.agenda)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data):
timeperiods = data.pop('timeperiods', [])
exceptions = data.pop('exceptions', [])
sources = data.pop('exception_sources', [])
unavailability_calendars = data.pop('unavailability_calendars', [])
data = clean_import_data(cls, data)
desk, dummy = cls.objects.update_or_create(slug=data['slug'], agenda=data['agenda'], defaults=data)
for timeperiod in timeperiods:
timeperiod['desk'] = desk
TimePeriod.import_json(timeperiod)
for exception in exceptions:
exception['desk'] = desk
TimePeriodException.import_json(exception)
for source in sources:
source['desk'] = desk
TimePeriodExceptionSource.import_json(source)
for unavailability_calendar in unavailability_calendars:
slug = unavailability_calendar['slug']
try:
target_calendar = UnavailabilityCalendar.objects.get(slug=slug)
except UnavailabilityCalendar.DoesNotExist:
raise AgendaImportError(_('The unavailability calendar "%s" does not exist.') % slug)
desk.unavailability_calendars.add(target_calendar)
def export_json(self):
time_period_exceptions = self.timeperiodexception_set.filter(source__settings_slug__isnull=True)
time_period_exception_sources = self.timeperiodexceptionsource_set.filter(settings_slug__isnull=False)
return {
'label': self.label,
'slug': self.slug,
'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.filter()],
'exceptions': [exception.export_json() for exception in time_period_exceptions],
'exception_sources': [source.export_json() for source in time_period_exception_sources],
'unavailability_calendars': [{'slug': x.slug} for x in self.unavailability_calendars.all()],
}
def duplicate(self, label=None, agenda_target=None, reset_slug=True):
# clone current desk
new_desk = copy.deepcopy(self)
new_desk.pk = None
# set label
new_desk.label = label or new_desk.label
# reset slug
if reset_slug:
new_desk.slug = None
# set agenda
if agenda_target:
new_desk.agenda = agenda_target
# store new desk
new_desk.save()
# clone related objects
for time_period in self.timeperiod_set.all():
time_period.duplicate(desk_target=new_desk)
for time_period_exception in self.timeperiodexception_set.filter(source__isnull=True):
time_period_exception.duplicate(desk_target=new_desk)
for time_period_exception_source in self.timeperiodexceptionsource_set.all():
time_period_exception_source.duplicate(desk_target=new_desk)
new_desk.unavailability_calendars.set(self.unavailability_calendars.all())
return new_desk
def get_exceptions_within_two_weeks(self):
# prefetched_exceptions contains desks exceptions + unavailability_calendars exceptions
# default ordering: start_datetime
in_two_weeks = make_aware(datetime.datetime.today() + datetime.timedelta(days=14))
exceptions = []
for exception in self.prefetched_exceptions:
if exception.end_datetime < now():
# exception ends in the past, skip it
continue
if exception.end_datetime <= in_two_weeks:
# ends in less than 2 weeks
exceptions.append(exception)
elif exception.start_datetime < now():
# has already started
exceptions.append(exception)
if exceptions:
return exceptions
# if none found within the 2 coming weeks, return the next one
for exception in self.prefetched_exceptions:
if exception.start_datetime < now():
# exception starts in the past, skip it
continue
# returns the first exception found
return [exception]
return []
def are_all_exceptions_displayed(self):
in_two_weeks = self.get_exceptions_within_two_weeks()
return len(self.prefetched_exceptions) == len(in_two_weeks)
def get_opening_hours(self, date):
openslots = IntervalSet()
weekday_index = get_weekday_index(date)
real_date = date.date() if isinstance(date, datetime.datetime) else date
for timeperiod in self.timeperiod_set.all():
if timeperiod.weekday_indexes and weekday_index not in timeperiod.weekday_indexes:
continue
# timeperiod_set.all() are prefetched, do not filter in queryset
if timeperiod.date != real_date and timeperiod.weekday != date.weekday():
continue
start_datetime = make_aware(datetime.datetime.combine(date, timeperiod.start_time))
end_datetime = make_aware(datetime.datetime.combine(date, timeperiod.end_time))
openslots.add(start_datetime, end_datetime)
aware_date = make_aware(datetime.datetime(date.year, date.month, date.day))
exceptions = IntervalSet()
aware_next_date = aware_date + datetime.timedelta(days=1)
for exception in self.prefetched_exceptions:
if exception.end_datetime < aware_date:
continue
if exception.start_datetime > aware_next_date:
continue
exceptions.add(exception.start_datetime, exception.end_datetime)
return [OpeningHour(*time_range) for time_range in (openslots - exceptions)]
def import_timeperiod_exceptions_from_settings(self, enable=False, spool=True):
start_update = now()
for slug, source_info in settings.EXCEPTIONS_SOURCES.items():
label = source_info['label']
try:
source = TimePeriodExceptionSource.objects.get(desk=self, settings_slug=slug)
except TimePeriodExceptionSource.DoesNotExist:
source = TimePeriodExceptionSource.objects.create(
desk=self, settings_slug=slug, enabled=False
)
source.settings_label = _(label)
source.save()
if enable or source.enabled: # if already enabled, update anyway
source.enable(spool=spool)
TimePeriodExceptionSource.objects.filter(
desk=self, settings_slug__isnull=False, last_update__lt=start_update
).delete() # source was not in settings anymore
class Resource(models.Model):
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
label = models.CharField(_('Label'), max_length=150)
description = models.TextField(_('Description'), blank=True, help_text=_('Optional description.'))
def __str__(self):
return self.label
class Meta:
ordering = ['label']
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
def can_be_viewed(self, user):
if user.is_staff:
return True
group_ids = [x.id for x in user.groups.all()]
return self.agenda_set.filter(edit_role_id__in=group_ids).exists()
@classmethod
def import_json(cls, data, overwrite=False):
data = clean_import_data(cls, data)
slug = data.pop('slug')
resource, created = cls.objects.update_or_create(slug=slug, defaults=data)
return created, resource
def export_json(self):
return {
'slug': self.slug,
'label': self.label,
'description': self.description,
}
class Category(models.Model):
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
label = models.CharField(_('Label'), max_length=150)
def __str__(self):
return self.label
class Meta:
ordering = ['label']
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data, overwrite=False):
data = clean_import_data(cls, data)
slug = data.pop('slug')
category, created = cls.objects.update_or_create(slug=slug, defaults=data)
return created, category
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
}
def ics_directory_path(instance, filename):
return f'ics/{str(uuid.uuid4())}/{filename}'
class TimePeriodExceptionSource(models.Model):
desk = models.ForeignKey(Desk, on_delete=models.CASCADE, null=True)
unavailability_calendar = models.ForeignKey('UnavailabilityCalendar', on_delete=models.CASCADE, null=True)
ics_filename = models.CharField(null=True, max_length=256)
ics_file = models.FileField(upload_to=ics_directory_path, blank=True, null=True)
ics_url = models.URLField(null=True, max_length=500)
settings_slug = models.CharField(null=True, max_length=150)
settings_label = models.CharField(null=True, max_length=150)
last_update = models.DateTimeField(auto_now=True, null=True)
enabled = models.BooleanField(default=True)
class Meta:
unique_together = ['desk', 'settings_slug']
def __str__(self):
if self.ics_filename is not None:
return self.ics_filename
if self.settings_label is not None:
return gettext(self.settings_label)
return self.ics_url
def duplicate(self, desk_target=None):
# clone current source
new_source = copy.deepcopy(self)
new_source.pk = None
# set desk
new_source.desk = desk_target or self.desk
# set ics_file
if self.ics_file:
with open(self.ics_file.path) as ics_file:
new_source.ics_file.save(self.ics_filename, ics_file, save=False)
# store new source
new_source.save()
# clone related objects
for time_period_exception in self.timeperiodexception_set.all():
time_period_exception.duplicate(desk_target=desk_target, source_target=new_source)
return new_source
def enable(self, spool=True):
self.enabled = True
self.save()
if spool and 'uwsgi' in sys.modules:
from chrono.utils.spooler import refresh_exceptions_from_settings
tenant = getattr(connection, 'tenant', None)
transaction.on_commit(
lambda: refresh_exceptions_from_settings.spool(
source_id=str(self.pk), domain=getattr(tenant, 'domain_url', None)
)
)
return
self.refresh_from_settings()
def refresh_from_settings(self):
if not self.enabled:
return
source_info = settings.EXCEPTIONS_SOURCES.get(self.settings_slug)
if not source_info:
return
source_class = import_string(source_info['class'])
calendar = source_class()
this_year = now().year
days = [day for year in range(this_year, this_year + 3) for day in calendar.holidays(year)]
with transaction.atomic():
self.timeperiodexception_set.all().delete()
for day, label in days:
start_datetime = make_aware(datetime.datetime.combine(day, datetime.datetime.min.time()))
end_datetime = start_datetime + datetime.timedelta(days=1)
TimePeriodException.objects.create(
desk=self.desk,
source=self,
label=_(label),
start_datetime=start_datetime,
end_datetime=end_datetime,
)
def disable(self):
self.timeperiodexception_set.all().delete()
self.enabled = False
self.save()
def render_ics_url(self):
return Template(self.ics_url).render(Context(settings.TEMPLATE_VARS))
def _check_ics_content(self):
if self.ics_url:
ics_url = self.render_ics_url()
try:
response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES, timeout=15)
response.raise_for_status()
except requests.HTTPError as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
% {'url': ics_url, 'status_code': e.response.status_code}
)
except requests.RequestException as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
% {'url': ics_url, 'exception': e}
)
try:
# override response encoding received in HTTP headers as it may
# often be missing and defaults to iso-8859-15.
response.content.decode('utf-8')
response.encoding = 'utf-8'
except UnicodeDecodeError:
pass
data = response.text
else:
data = force_str(self.ics_file.read())
try:
parsed = vobject.readOne(data)
except vobject.base.ParseError:
raise ICSError(_('File format is invalid.'))
if not parsed.contents.get('vevent'):
raise ICSError(_('The file doesn\'t contain any events.'))
for vevent in parsed.contents.get('vevent', []):
summary = self._get_summary_from_vevent(vevent)
try:
vevent.dtstart.value
except AttributeError:
raise ICSError(_('Event "%s" has no start date.') % summary)
return parsed
def _get_summary_from_vevent(self, vevent):
if 'summary' in vevent.contents:
return force_str(vevent.contents['summary'][0].value)
return _('Exception')
def refresh_timeperiod_exceptions(self, data=None):
if 'uwsgi' in sys.modules:
from chrono.utils.spooler import refresh_exception_source
tenant = getattr(connection, 'tenant', None)
transaction.on_commit(
lambda: refresh_exception_source.spool(
source_id=str(self.pk), domain=getattr(tenant, 'domain_url', None)
)
)
return
self.refresh_timeperiod_exceptions_from_ics(data=data)
def refresh_timeperiod_exceptions_from_ics(self, data=None, recurring_days=600):
if data is None:
parsed = self._check_ics_content()
else:
parsed = data
categories = collections.defaultdict(list)
with transaction.atomic():
# delete old exceptions related to this source
self.timeperiodexception_set.all().delete()
# create new exceptions
update_datetime = now()
for vevent in parsed.contents.get('vevent', []):
summary = self._get_summary_from_vevent(vevent)
try:
start_dt = vevent.dtstart.value
if not isinstance(start_dt, datetime.datetime):
start_dt = datetime.datetime.combine(start_dt, datetime.datetime.min.time())
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
except AttributeError:
raise ICSError(_('Event "%s" has no start date.') % summary)
try:
end_dt = vevent.dtend.value
if not isinstance(end_dt, datetime.datetime):
end_dt = datetime.datetime.combine(end_dt, datetime.datetime.min.time())
if not is_aware(end_dt):
end_dt = make_aware(end_dt)
duration = end_dt - start_dt
except AttributeError:
try:
duration = vevent.duration.value
end_dt = start_dt + duration
except AttributeError:
# events without end date are considered as ending the same day
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
duration = end_dt - start_dt
event = {
'start_datetime': start_dt,
'end_datetime': end_dt,
'label': summary,
'desk_id': self.desk_id,
'unavailability_calendar_id': self.unavailability_calendar_id,
'source': self,
'recurrence_id': 0,
}
if 'categories' in vevent.contents and len(vevent.categories.value) > 0:
category = vevent.categories.value[0]
else:
category = None
if not vevent.rruleset:
# classical event
exception = TimePeriodException.objects.create(**event)
if category:
categories[category].append(exception)
elif vevent.rruleset.count():
# recurring event until recurring_days in the future
from_dt = start_dt
until_dt = update_datetime + datetime.timedelta(days=recurring_days)
if not is_aware(vevent.rruleset[0]):
from_dt = make_naive(from_dt)
until_dt = make_naive(until_dt)
i = -1
for i, start_dt in enumerate(vevent.rruleset.between(from_dt, until_dt, inc=True)):
# recompute start_dt and end_dt from occurrences and duration
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
end_dt = start_dt + duration
event['recurrence_id'] = i
event['start_datetime'] = start_dt
event['end_datetime'] = end_dt
if end_dt >= update_datetime:
exception = TimePeriodException.objects.create(**event)
if category:
categories[category].append(exception)
if self.unavailability_calendar_id:
for category, exceptions in categories.items():
exception_group, dummy = TimePeriodExceptionGroup.objects.get_or_create(
unavailability_calendar_id=self.unavailability_calendar_id,
slug=category,
defaults={'label': exceptions[0].label},
)
exception_group.exceptions.add(*exceptions)
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
desk = data.pop('desk')
settings_slug = data.pop('settings_slug')
source, _ = cls.objects.update_or_create(desk=desk, settings_slug=settings_slug, defaults=data)
if source.enabled:
source.enable()
def export_json(self):
'''Export only sources from settings.'''
return {
'settings_slug': self.settings_slug,
'settings_label': self.settings_label,
'enabled': self.enabled,
}
class UnavailabilityCalendar(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
desks = models.ManyToManyField(Desk, related_name='unavailability_calendars')
edit_role = models.ForeignKey(
Group,
blank=True,
null=True,
default=None,
related_name='+',
verbose_name=_('Edit Role'),
on_delete=models.SET_NULL,
)
view_role = models.ForeignKey(
Group,
blank=True,
null=True,
default=None,
related_name='+',
verbose_name=_('View Role'),
on_delete=models.SET_NULL,
)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
@property
def base_slug(self):
return slugify(self.label)
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
def can_be_managed(self, user):
if user.is_staff:
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.edit_role_id in group_ids)
def can_be_viewed(self, user):
if self.can_be_managed(user):
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.view_role_id in group_ids)
def get_absolute_url(self):
return reverse('chrono-manager-unavailability-calendar-view', kwargs={'pk': self.id})
def export_json(self):
unavailability_calendar = {
'label': self.label,
'slug': self.slug,
'permissions': {
'view': self.view_role.name if self.view_role else None,
'edit': self.edit_role.name if self.edit_role else None,
},
'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()],
}
return unavailability_calendar
@classmethod
def import_json(cls, data, overwrite=False):
data = data.copy()
permissions = data.pop('permissions', {})
exceptions = data.pop('exceptions', [])
for permission in ('view', 'edit'):
if permissions.get(permission):
data[permission + '_role'] = Group.objects.get(name=permissions[permission])
data = clean_import_data(cls, data)
unavailability_calendar, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite:
TimePeriodException.objects.filter(unavailability_calendar=unavailability_calendar).delete()
for exception in exceptions:
exception['unavailability_calendar'] = unavailability_calendar
TimePeriodException.import_json(exception)
return created, unavailability_calendar
class TimePeriodExceptionGroup(models.Model):
unavailability_calendar = models.ForeignKey(UnavailabilityCalendar, on_delete=models.CASCADE)
slug = models.SlugField(_('Identifier'), max_length=160)
label = models.CharField(_('Label'), max_length=150)
class Meta:
ordering = ['label']
unique_together = ['unavailability_calendar', 'slug']
def __str__(self):
return self.label
class TimePeriodException(models.Model):
desk = models.ForeignKey(Desk, on_delete=models.CASCADE, null=True)
unavailability_calendar = models.ForeignKey(UnavailabilityCalendar, on_delete=models.CASCADE, null=True)
source = models.ForeignKey(TimePeriodExceptionSource, on_delete=models.CASCADE, null=True)
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
start_datetime = models.DateTimeField(_('Exception start time'))
end_datetime = models.DateTimeField(_('Exception end time'))
update_datetime = models.DateTimeField(auto_now=True)
recurrence_id = models.PositiveIntegerField(_('Recurrence ID'), default=0)
group = models.ForeignKey(
TimePeriodExceptionGroup, on_delete=models.CASCADE, null=True, related_name='exceptions'
)
@property
def read_only(self):
if self.source_id:
return True
if self.unavailability_calendar_id:
return True
return False
class Meta:
ordering = ['start_datetime']
def __str__(self):
if is_midnight(self.start_datetime) and is_midnight(self.end_datetime):
# if both dates are at midnight don't include the time part
if self.end_datetime == self.start_datetime + datetime.timedelta(days=1):
# a single day
exc_repr = '%s' % date_format(localtime(self.start_datetime), 'SHORT_DATE_FORMAT')
else:
exc_repr = '%s%s' % (
date_format(localtime(self.start_datetime), 'SHORT_DATE_FORMAT'),
date_format(localtime(self.end_datetime), 'SHORT_DATE_FORMAT'),
)
else:
if localtime(self.start_datetime).date() == localtime(self.end_datetime).date():
# same day
exc_repr = '%s%s' % (
date_format(localtime(self.start_datetime), 'SHORT_DATETIME_FORMAT'),
date_format(localtime(self.end_datetime), 'TIME_FORMAT'),
)
else:
exc_repr = '%s%s' % (
date_format(localtime(self.start_datetime), 'SHORT_DATETIME_FORMAT'),
date_format(localtime(self.end_datetime), 'SHORT_DATETIME_FORMAT'),
)
if self.label:
exc_repr = '%s (%s)' % (self.label, exc_repr)
return exc_repr
def has_booking_within_time_slot(self, target_desk=None):
if not (self.start_datetime and self.end_datetime):
# incomplete time period, can't tell
return False
query = Event.objects
if self.desk:
query = query.filter(desk=self.desk)
elif self.unavailability_calendar and not target_desk:
query = query.filter(desk__in=self.unavailability_calendar.desks.all())
elif target_desk:
query = query.filter(desk=target_desk)
else:
# orphan exception
return False
for event in query.filter(booking__isnull=False, booking__cancellation_datetime__isnull=True):
if self.start_datetime <= event.start_datetime < self.end_datetime:
return True
if event.meeting_type:
if (
event.start_datetime
<= self.start_datetime
< event.start_datetime + datetime.timedelta(minutes=event.meeting_type.duration)
):
return True
return False
@classmethod
def import_json(cls, data):
def import_datetime(s):
'''Import datetime as a naive ISO8601 serialization'''
try:
return make_aware(datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S'))
except ValueError:
raise AgendaImportError(_('Bad datetime format "%s"') % s)
for k, v in data.items():
if k.endswith('_datetime'):
data[k] = import_datetime(v)
data = clean_import_data(cls, data)
query_data = data.copy()
query_data.pop('update_datetime')
try:
cls.objects.update_or_create(defaults=data, **query_data)
except cls.MultipleObjectsReturned:
cls.objects.filter(**query_data).update(update_datetime=data['update_datetime'])
def export_json(self):
def export_datetime(dt):
'''Export datetime as a naive ISO8601 serialization'''
return make_naive(dt).strftime('%Y-%m-%d %H:%M:%S')
return {
'label': self.label,
'start_datetime': export_datetime(self.start_datetime),
'end_datetime': export_datetime(self.end_datetime),
'recurrence_id': self.recurrence_id,
'update_datetime': export_datetime(self.update_datetime),
}
def duplicate(self, desk_target=None, source_target=None):
# clone current exception
new_exception = copy.deepcopy(self)
new_exception.pk = None
# set desk
new_exception.desk = desk_target or self.desk
# set source
new_exception.source = source_target or self.source
# store new exception
new_exception.save()
return new_exception
def as_interval(self):
'''Simplify insertion into IntervalSet'''
return Interval(self.start_datetime, self.end_datetime)
class EventCancellationReport(models.Model):
event = models.ForeignKey(Event, related_name='cancellation_reports', on_delete=models.CASCADE)
timestamp = models.DateTimeField(auto_now_add=True)
seen = models.BooleanField(default=False)
bookings = models.ManyToManyField(Booking)
booking_errors = models.JSONField(default=dict)
def __str__(self):
return '%s - %s' % (self.timestamp.strftime('%Y-%m-%d %H:%M:%S'), self.event)
class Meta:
ordering = ['-timestamp']
class RecurrenceExceptionsReport(models.Model):
agenda = models.OneToOneField(
Agenda, related_name='recurrence_exceptions_report', on_delete=models.CASCADE
)
events = models.ManyToManyField(Event)
class NotificationType:
def __init__(self, name, related_field, settings):
self.name = name
self.related_field = related_field
self.settings = settings
@property
def enabled(self):
choice = getattr(self.settings, self.name)
if not choice:
return False
if choice == self.settings.EMAIL_FIELD:
return bool(getattr(self.settings, self.name + '_emails'))
return True
def get_recipients(self):
choice = getattr(self.settings, self.name)
if not choice:
return []
if choice == self.settings.EMAIL_FIELD:
return getattr(self.settings, self.name + '_emails')
role = self.settings.get_role_from_choice(choice)
if not role or not hasattr(role, 'role'):
return []
emails = role.role.emails
if role.role.emails_to_members:
emails.extend(role.user_set.values_list('email', flat=True))
return emails
@property
def display_value(self):
choice = getattr(self.settings, self.name)
if not choice:
return ''
if choice == self.settings.EMAIL_FIELD:
emails = getattr(self.settings, self.name + '_emails')
return ', '.join(emails)
role = self.settings.get_role_from_choice(choice) or _('undefined')
display_name = getattr(self.settings, 'get_%s_display' % self.name)()
return '%s (%s)' % (display_name, role)
@property
def label(self):
return self.settings._meta.get_field(self.name).verbose_name
class AgendaNotificationsSettings(models.Model):
EMAIL_FIELD = 'use-email-field'
VIEW_ROLE = 'view-role'
EDIT_ROLE = 'edit-role'
CHOICES = [
(EDIT_ROLE, _('Edit Role')),
(VIEW_ROLE, _('View Role')),
(EMAIL_FIELD, _('Specify email addresses manually')),
]
agenda = models.OneToOneField(Agenda, on_delete=models.CASCADE, related_name='notifications_settings')
almost_full_event = models.CharField(
max_length=16, blank=True, choices=CHOICES, verbose_name=_('Almost full event (90%)')
)
almost_full_event_emails = ArrayField(models.EmailField(), blank=True, null=True)
full_event = models.CharField(max_length=16, blank=True, choices=CHOICES, verbose_name=_('Full event'))
full_event_emails = ArrayField(models.EmailField(), blank=True, null=True)
cancelled_event = models.CharField(
max_length=16, blank=True, choices=CHOICES, verbose_name=_('Cancelled event')
)
cancelled_event_emails = ArrayField(models.EmailField(), blank=True, null=True)
@classmethod
def get_email_field_names(cls):
return [field.name for field in cls._meta.get_fields() if isinstance(field, ArrayField)]
@staticmethod
def get_role_field_names():
return ['almost_full_event', 'full_event', 'cancelled_event']
def get_notification_types(self):
for field in self.get_role_field_names():
notification_type = NotificationType(
name=field, related_field=field.replace('_event', ''), settings=self
)
if notification_type.enabled:
yield notification_type
def get_role_from_choice(self, choice):
if choice == self.EDIT_ROLE:
return self.agenda.edit_role
elif choice == self.VIEW_ROLE:
return self.agenda.view_role
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
agenda = data.pop('agenda')
cls.objects.update_or_create(agenda=agenda, defaults=data)
def export_json(self):
return {
'almost_full_event': self.almost_full_event,
'almost_full_event_emails': self.almost_full_event_emails,
'full_event': self.full_event,
'full_event_emails': self.full_event_emails,
'cancelled_event': self.cancelled_event,
'cancelled_event_emails': self.cancelled_event_emails,
}
def duplicate(self, agenda_target):
new_settings = copy.deepcopy(self)
new_settings.pk = None
new_settings.agenda = agenda_target
new_settings.save()
return new_settings
class AgendaReminderSettings(models.Model):
ONE_DAY_BEFORE = 1
TWO_DAYS_BEFORE = 2
THREE_DAYS_BEFORE = 3
FOUR_DAYS_BEFORE = 4
FIVE_DAYS_BEFORE = 5
CHOICES = [
(None, _('Never')),
(ONE_DAY_BEFORE, _('One day before')),
(TWO_DAYS_BEFORE, _('Two days before')),
(THREE_DAYS_BEFORE, _('Three days before')),
(FOUR_DAYS_BEFORE, _('Four days before')),
(FIVE_DAYS_BEFORE, _('Five days before')),
]
agenda = models.OneToOneField(Agenda, on_delete=models.CASCADE, related_name='reminder_settings')
days_before_email = models.IntegerField(
null=True,
blank=True,
choices=CHOICES,
verbose_name=_('Send email reminder'),
help_text=_(
'In order to prevent users from getting a reminder shortly after booking, '
'a reminder is sent less only if at least 12 hours have elapsed since booking time.'
),
)
email_extra_info = models.TextField(
blank=True,
verbose_name=_('Additional text to include in emails'),
validators=[booking_template_validator],
help_text=_(
'Basic information such as event name, time and date are already included. '
'Booking object can be accessed using standard template syntax. '
'This allows to access agenda name via {{ booking.event.agenda.label }}, '
'meeting type name via {{ booking.event.meeting_type.label }}, or any extra '
'parameter passed on booking creation via {{ booking.extra_data.xxx }}.'
),
)
days_before_sms = models.IntegerField(
null=True,
blank=True,
choices=CHOICES,
verbose_name=_('Send SMS reminder'),
help_text=_(
'In order to prevent users from getting a reminder shortly after booking, '
'a reminder is sent less only if at least 12 hours have elapsed since booking time.'
),
)
sms_extra_info = models.TextField(
blank=True,
verbose_name=_('Additional text to include in SMS'),
validators=[booking_template_validator],
help_text=email_extra_info.help_text,
)
def display_info(self):
def get_message(days, by_email_or_sms):
return ngettext(
'Users will be reminded of their booking %(by_email_or_sms)s, one day in advance.',
'Users will be reminded of their booking %(by_email_or_sms)s, %(days)s days in advance.',
days,
) % {'days': days, 'by_email_or_sms': by_email_or_sms}
if self.days_before_email and self.days_before_email == self.days_before_sms:
return [get_message(self.days_before_email, _('both by email and by SMS'))]
messages = []
if self.days_before_email:
messages.append(get_message(self.days_before_email, _('by email')))
if self.days_before_sms:
messages.append(get_message(self.days_before_sms, _('by SMS')))
return messages
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
agenda = data.pop('agenda')
cls.objects.update_or_create(agenda=agenda, defaults=data)
def export_json(self):
return {
'days_before_email': self.days_before_email,
'days_before_sms': self.days_before_sms,
'email_extra_info': self.email_extra_info,
'sms_extra_info': self.sms_extra_info,
}
def duplicate(self, agenda_target):
new_settings = copy.deepcopy(self)
new_settings.pk = None
new_settings.agenda = agenda_target
new_settings.save()
return new_settings
class Subscription(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE, related_name='subscriptions')
user_external_id = models.CharField(max_length=250)
user_last_name = models.CharField(max_length=250)
user_first_name = models.CharField(max_length=250)
user_email = models.EmailField(blank=True)
user_phone_number = models.CharField(max_length=30, blank=True)
extra_data = models.JSONField(null=True)
date_start = models.DateField()
date_end = models.DateField()
@property
def user_name(self):
return ('%s %s' % (self.user_first_name, self.user_last_name)).strip()
@property
def label(self):
return _('Subscription')
def get_user_block(self):
template_vars = Context(settings.TEMPLATE_VARS, autoescape=False)
template_vars.update(
{
'booking': self,
}
)
try:
return escape(Template(self.agenda.get_booking_user_block_template()).render(template_vars))
except (VariableDoesNotExist, TemplateSyntaxError):
return
def get_extra_user_block(self, request):
context = RequestContext(request)
context.update(
{
'booking': self,
}
)
try:
return Template(self.agenda.booking_extra_user_block_template).render(context)
except (VariableDoesNotExist, TemplateSyntaxError):
return
class Person(models.Model):
user_external_id = models.CharField(max_length=250, unique=True)
first_name = models.CharField(max_length=250)
last_name = models.CharField(max_length=250)
def __str__(self):
return '%s %s' % (self.first_name, self.last_name)
@dataclasses.dataclass(frozen=True)
class SharedCustodySlot:
guardian: Person = dataclasses.field(compare=False)
date: datetime.date
label: str = dataclasses.field(compare=False, default='')
def __str__(self):
if self.label:
return '%s (%s)' % (self.guardian, self.label)
else:
return str(self.guardian)
class SharedCustodyAgenda(models.Model):
first_guardian = models.ForeignKey(
Person, verbose_name=_('First guardian'), on_delete=models.CASCADE, related_name='+'
)
second_guardian = models.ForeignKey(
Person, verbose_name=_('Second guardian'), on_delete=models.CASCADE, related_name='+'
)
child = models.ForeignKey(Person, verbose_name=_('Child'), on_delete=models.CASCADE, related_name='+')
date_start = models.DateField(_('Start'))
date_end = models.DateField(_('End'), null=True)
class Meta:
constraints = [
models.UniqueConstraint(
fields=['child'], condition=Q(date_end__isnull=True), name='unique_child_no_date_end'
)
]
@property
def label(self):
return _('Custody agenda of %(first_guardian)s and %(second_guardian)s for %(child)s') % {
'first_guardian': self.first_guardian,
'second_guardian': self.second_guardian,
'child': self.child,
}
def get_absolute_url(self):
return reverse('chrono-manager-shared-custody-agenda-view', kwargs={'pk': self.pk})
def get_settings_url(self):
return reverse('chrono-manager-shared-custody-agenda-settings', kwargs={'pk': self.pk})
def get_custody_slots(self, min_date, max_date):
slots = set()
periods = (
self.periods.filter(date_start__lt=max_date, date_end__gt=min_date)
.order_by('-holiday_rule')
.select_related('holiday_rule__holiday', 'guardian')
)
for period in periods:
date = max(period.date_start, min_date)
label = period.holiday_rule.holiday.label if period.holiday_rule else ''
while date < period.date_end and date < max_date:
slots.add(SharedCustodySlot(guardian=period.guardian, date=date, label=label))
date += datetime.timedelta(days=1)
for rule in self.rules.all().select_related('guardian'):
slots.update(rule.get_slots(min_date, max_date))
slots = sorted(slots, key=lambda x: x.date)
return slots
def is_complete(self):
day_counts = self.rules.aggregate(
all_week=Coalesce(
SumCardinality('days', filter=Q(weeks='')), 0, output_field=models.IntegerField()
),
even_week=Coalesce(
SumCardinality('days', filter=Q(weeks='even')), 0, output_field=models.IntegerField()
),
odd_week=Coalesce(
SumCardinality('days', filter=Q(weeks='odd')), 0, output_field=models.IntegerField()
),
)
even_week_day_count = day_counts['all_week'] + day_counts['even_week']
odd_week_day_count = day_counts['all_week'] + day_counts['odd_week']
return bool(even_week_day_count == 7 and odd_week_day_count == 7)
def rule_overlaps(self, days, weeks, instance=None):
qs = self.rules
if hasattr(instance, 'pk'):
qs = qs.exclude(pk=instance.pk)
if weeks:
qs = qs.filter(Q(weeks='') | Q(weeks=weeks))
qs = qs.filter(days__overlap=days)
return qs.exists()
def holiday_rule_overlaps(self, holiday, years, periodicity, instance=None):
qs = self.holiday_rules.filter(holiday=holiday)
if hasattr(instance, 'pk'):
qs = qs.exclude(pk=instance.pk)
if years:
qs = qs.filter(Q(years='') | Q(years=years))
if periodicity == 'first-half':
qs = qs.exclude(periodicity='second-half')
elif periodicity == 'second-half':
qs = qs.exclude(periodicity='first-half')
elif periodicity == 'first-and-third-quarters':
qs = qs.exclude(periodicity='second-and-fourth-quarters')
elif periodicity == 'second-and-fourth-quarters':
qs = qs.exclude(periodicity='first-and-third-quarters')
return qs.exists()
def period_overlaps(self, date_start, date_end, instance=None):
qs = self.periods.filter(holiday_rule__isnull=True)
if hasattr(instance, 'pk'):
qs = qs.exclude(pk=instance.pk)
qs = qs.extra(
where=["(date_start, date_end) OVERLAPS (%s, %s)"],
params=[date_start, date_end],
)
return qs.exists()
class SharedCustodyRule(models.Model):
WEEK_CHOICES = [
('', pgettext_lazy('weeks', 'All')),
('even', pgettext_lazy('weeks', 'Even')),
('odd', pgettext_lazy('weeks', 'Odd')),
]
agenda = models.ForeignKey(SharedCustodyAgenda, on_delete=models.CASCADE, related_name='rules')
days = ArrayField(
models.IntegerField(choices=WEEKDAY_CHOICES),
verbose_name=_('Days'),
)
weeks = models.CharField(_('Weeks'), choices=WEEK_CHOICES, blank=True, max_length=16)
guardian = models.ForeignKey(Person, verbose_name=_('Guardian'), on_delete=models.CASCADE)
def get_slots(self, min_date, max_date):
recurrence_rule = {
'freq': WEEKLY,
'byweekday': self.days,
}
if self.weeks == 'odd':
recurrence_rule['byweekno'] = list(range(1, 55, 2))
elif self.weeks == 'even':
recurrence_rule['byweekno'] = list(range(0, 54, 2))
return [
SharedCustodySlot(self.guardian, dt.date())
for dt in rrule(dtstart=min_date, until=max_date - datetime.timedelta(days=1), **recurrence_rule)
]
@property
def label(self):
days_count = len(self.days)
if days_count == 7:
repeat = _('daily')
elif days_count > 1 and (self.days[-1] - self.days[0]) == days_count - 1:
# days are contiguous
repeat = _('from %(weekday)s to %(last_weekday)s') % {
'weekday': str(WEEKDAYS[self.days[0]]),
'last_weekday': str(WEEKDAYS[self.days[-1]]),
}
else:
repeat = _('on %(weekdays)s') % {
'weekdays': ', '.join([str(WEEKDAYS_PLURAL[i]) for i in self.days])
}
if self.weeks == 'odd':
repeat = '%s, %s' % (repeat, _('on odd weeks'))
elif self.weeks == 'even':
repeat = '%s, %s' % (repeat, _('on even weeks'))
return repeat
class Meta:
ordering = ['days__0', 'weeks']
class SharedCustodyHolidayRule(models.Model):
YEAR_CHOICES = [
('', pgettext_lazy('years', 'All')),
('even', pgettext_lazy('years', 'Even')),
('odd', pgettext_lazy('years', 'Odd')),
]
PERIODICITY_CHOICES = [
('first-half', _('First half')),
('second-half', _('Second half')),
('first-and-third-quarters', _('First and third quarters')),
('second-and-fourth-quarters', _('Second and fourth quarters')),
]
agenda = models.ForeignKey(SharedCustodyAgenda, on_delete=models.CASCADE, related_name='holiday_rules')
holiday = models.ForeignKey(TimePeriodExceptionGroup, verbose_name=_('Holiday'), on_delete=models.PROTECT)
years = models.CharField(_('Years'), choices=YEAR_CHOICES, blank=True, max_length=16)
periodicity = models.CharField(_('Periodicity'), choices=PERIODICITY_CHOICES, blank=True, max_length=32)
guardian = models.ForeignKey(Person, verbose_name=_('Guardian'), on_delete=models.CASCADE)
def update_or_create_periods(self):
shared_custody_periods = []
for exception in self.holiday.exceptions.all():
date_start = localtime(exception.start_datetime).date()
if self.years == 'even' and date_start.year % 2:
continue
if self.years == 'odd' and not date_start.year % 2:
continue
date_start_sunday = date_start + relativedelta(weekday=SU)
date_end = localtime(exception.end_datetime).date()
number_of_weeks = (date_end - date_start_sunday).days // 7
periods = []
if self.periodicity == 'first-half':
date_end = date_start_sunday + datetime.timedelta(days=7 * (number_of_weeks // 2))
periods = [(date_start, date_end)]
elif self.periodicity == 'second-half':
date_start = date_start_sunday + datetime.timedelta(days=7 * (number_of_weeks // 2))
periods = [(date_start, date_end)]
elif self.periodicity == 'first-and-third-quarters' and number_of_weeks >= 4:
weeks_in_quarters = round(number_of_weeks / 4)
first_quarters_date_end = date_start_sunday + datetime.timedelta(days=7 * weeks_in_quarters)
third_quarters_date_start = date_start_sunday + datetime.timedelta(
days=7 * weeks_in_quarters * 2
)
third_quarters_date_end = date_start_sunday + datetime.timedelta(
days=7 * weeks_in_quarters * 3
)
periods = [
(date_start, first_quarters_date_end),
(third_quarters_date_start, third_quarters_date_end),
]
elif self.periodicity == 'second-and-fourth-quarters' and number_of_weeks >= 4:
weeks_in_quarters = round(number_of_weeks / 4)
second_quarters_date_start = date_start_sunday + datetime.timedelta(
days=7 * weeks_in_quarters
)
second_quarters_date_end = date_start_sunday + datetime.timedelta(
days=7 * weeks_in_quarters * 2
)
fourth_quarters_date_start = date_start_sunday + datetime.timedelta(
days=7 * weeks_in_quarters * 3
)
periods = [
(second_quarters_date_start, second_quarters_date_end),
(fourth_quarters_date_start, date_end),
]
elif not self.periodicity:
periods = [(date_start, date_end)]
for date_start, date_end in periods:
shared_custody_periods.append(
SharedCustodyPeriod(
guardian=self.guardian,
agenda=self.agenda,
holiday_rule=self,
date_start=date_start,
date_end=date_end,
)
)
with transaction.atomic():
SharedCustodyPeriod.objects.filter(
guardian=self.guardian, agenda=self.agenda, holiday_rule=self
).delete()
SharedCustodyPeriod.objects.bulk_create(shared_custody_periods)
@property
def label(self):
label = self.holiday.label
if self.periodicity == 'first-half':
label = '%s, %s' % (label, _('the first half'))
elif self.periodicity == 'second-half':
label = '%s, %s' % (label, _('the second half'))
elif self.periodicity == 'first-and-third-quarters':
label = '%s, %s' % (label, _('the first and third quarters'))
elif self.periodicity == 'second-and-fourth-quarters':
label = '%s, %s' % (label, _('the second and fourth quarters'))
if self.years == 'odd':
label = '%s, %s' % (label, _('on odd years'))
elif self.years == 'even':
label = '%s, %s' % (label, _('on even years'))
return label
class Meta:
ordering = ['holiday__label', 'guardian', 'years', 'periodicity']
class SharedCustodyPeriod(models.Model):
agenda = models.ForeignKey(SharedCustodyAgenda, on_delete=models.CASCADE, related_name='periods')
guardian = models.ForeignKey(Person, on_delete=models.CASCADE, related_name='+')
holiday_rule = models.ForeignKey(SharedCustodyHolidayRule, null=True, on_delete=models.CASCADE)
date_start = models.DateField(_('Start'))
date_end = models.DateField(_('End'))
class Meta:
ordering = ['date_start']
def __str__(self):
if self.date_end == self.date_start + datetime.timedelta(days=1):
exc_repr = '%s' % date_format(self.date_start, 'SHORT_DATE_FORMAT')
else:
exc_repr = '%s%s' % (
date_format(self.date_start, 'SHORT_DATE_FORMAT'),
date_format(self.date_end, 'SHORT_DATE_FORMAT'),
)
return '%s, %s' % (self.guardian, exc_repr)
class SharedCustodySettings(models.Model):
management_role = models.ForeignKey(
Group,
blank=True,
null=True,
default=None,
related_name='+',
verbose_name=_('Management role'),
on_delete=models.SET_NULL,
)
holidays_calendar = models.ForeignKey(
UnavailabilityCalendar,
verbose_name=_('Holidays calendar'),
null=True,
blank=True,
related_name='+',
on_delete=models.SET_NULL,
)
def export_json(self):
return {
'management_role': self.management_role.name if self.management_role else None,
'holidays_calendar': self.holidays_calendar.slug if self.holidays_calendar else None,
}
@classmethod
def import_json(cls, data):
if data.get('management_role'):
data['management_role'] = Group.objects.get(name=data['management_role'])
if data.get('holidays_calendar'):
try:
data['holidays_calendar'] = UnavailabilityCalendar.objects.get(slug=data['holidays_calendar'])
except UnavailabilityCalendar.DoesNotExist:
raise AgendaImportError(
_('The unavailability calendar "%s" does not exist.') % data['holidays_calendar']
)
cls.objects.update_or_create(defaults=data)
@classmethod
def get_singleton(cls):
try:
return cls.objects.get()
except cls.DoesNotExist:
return cls()