chrono/chrono/agendas/models.py

696 lines
28 KiB
Python

# -*- coding: utf-8 -*-
# chrono - agendas system
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import fractions
import requests
import vobject
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.exceptions import ValidationError
from django.db import models, transaction
from django.db.models import Q
from django.urls import reverse
from django.utils.dates import WEEKDAYS
from django.utils.encoding import force_text, python_2_unicode_compatible
from django.utils.formats import date_format
from django.utils.text import slugify
from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
from django.utils.translation import ugettext_lazy as _
from jsonfield import JSONField
from ..interval import Intervals
AGENDA_KINDS = (
('events', _('Events')),
('meetings', _('Meetings')),
)
def is_midnight(dtime):
dtime = localtime(dtime)
return dtime.hour == 0 and dtime.minute == 0
def generate_slug(instance, **query_filters):
base_slug = slugify(instance.label)
slug = base_slug
i = 1
while instance._meta.model.objects.filter(slug=slug, **query_filters).exists():
slug = '%s-%s' % (base_slug, i)
i += 1
return slug
class ICSError(Exception):
pass
class AgendaImportError(Exception):
pass
class Agenda(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
kind = models.CharField(_('Kind'), max_length=20, choices=AGENDA_KINDS, default='events')
minimal_booking_delay = models.PositiveIntegerField(
_('Minimal booking delay (in days)'), default=1)
maximal_booking_delay = models.PositiveIntegerField(
_('Maximal booking delay (in days)'), default=56) # eight weeks
edit_role = models.ForeignKey(Group, blank=True, null=True, default=None,
related_name='+', verbose_name=_('Edit Role'),
on_delete=models.SET_NULL)
view_role = models.ForeignKey(Group, blank=True, null=True, default=None,
related_name='+', verbose_name=_('View Role'),
on_delete=models.SET_NULL)
class Meta:
ordering = ['label']
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super(Agenda, self).save(*args, **kwargs)
def get_absolute_url(self):
return reverse('chrono-manager-agenda-view', kwargs={'pk': self.id})
def can_be_managed(self, user):
if user.is_staff:
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.edit_role_id in group_ids)
def can_be_viewed(self, user):
if self.can_be_managed(user):
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.view_role_id in group_ids)
def get_base_meeting_duration(self):
durations = [x.duration for x in MeetingType.objects.filter(agenda=self)]
if not durations:
raise ValueError()
gcd = durations[0]
for duration in durations[1:]:
gcd = fractions.gcd(duration, gcd)
return gcd
def export_json(self):
agenda = {
'label': self.label,
'slug': self.slug,
'kind': self.kind,
'minimal_booking_delay': self.minimal_booking_delay,
'maximal_booking_delay': self.maximal_booking_delay,
'permissions': {
'view': self.view_role.name if self.view_role else None,
'edit': self.edit_role.name if self.edit_role else None,
}
}
if self.kind == 'events':
agenda['events'] = [x.export_json() for x in self.event_set.all()]
elif self.kind == 'meetings':
agenda['meetingtypes'] = [x.export_json() for x in self.meetingtype_set.all()]
agenda['desks'] = [desk.export_json() for desk in self.desk_set.all()]
return agenda
@classmethod
def import_json(cls, data, overwrite=False):
data = data.copy()
permissions = data.pop('permissions') or {}
if data['kind'] == 'events':
events = data.pop('events')
elif data['kind'] == 'meetings':
meetingtypes = data.pop('meetingtypes')
desks = data.pop('desks')
for permission in ('view', 'edit'):
if permissions.get(permission):
try:
data[permission + '_role'] = Group.objects.get(name=permissions[permission])
except Group.DoesNotExist:
raise AgendaImportError(_('Missing "%s" role') % permissions[permission])
agenda, created = cls.objects.get_or_create(slug=data['slug'], defaults=data)
if not created:
for k, v in data.items():
setattr(agenda, k, v)
if data['kind'] == 'events':
if overwrite:
Event.objects.filter(agenda=agenda).delete()
for event_data in events:
event_data['agenda'] = agenda
Event.import_json(event_data).save()
elif data['kind'] == 'meetings':
if overwrite:
MeetingType.objects.filter(agenda=agenda).delete()
Desk.objects.filter(agenda=agenda).delete()
for type_data in meetingtypes:
type_data['agenda'] = agenda
MeetingType.import_json(type_data).save()
for desk in desks:
desk['agenda'] = agenda
Desk.import_json(desk).save()
return created
WEEKDAYS_LIST = sorted(WEEKDAYS.items(), key=lambda x: x[0])
@python_2_unicode_compatible
class TimeSlot(object):
def __init__(self, start_datetime, meeting_type, desk):
self.start_datetime = start_datetime
self.end_datetime = start_datetime + datetime.timedelta(minutes=meeting_type.duration)
self.meeting_type = meeting_type
self.id = '%s:%s' % (self.meeting_type.id, start_datetime.strftime('%Y-%m-%d-%H%M'))
self.desk = desk
def __str__(self):
return date_format(self.start_datetime, format='DATETIME_FORMAT')
@python_2_unicode_compatible
class TimePeriod(models.Model):
weekday = models.IntegerField(_('Week day'), choices=WEEKDAYS_LIST)
start_time = models.TimeField(_('Start'))
end_time = models.TimeField(_('End'))
desk = models.ForeignKey('Desk', on_delete=models.CASCADE)
class Meta:
ordering = ['weekday', 'start_time']
def __str__(self):
return u'%s / %s%s' % (
force_text(WEEKDAYS[self.weekday]),
date_format(self.start_time, 'TIME_FORMAT'),
date_format(self.end_time, 'TIME_FORMAT'))
@property
def weekday_str(self):
return WEEKDAYS[self.weekday]
@classmethod
def import_json(cls, data):
return cls(**data)
def export_json(self):
return {
'weekday': self.weekday,
'start_time': self.start_time.strftime('%H:%M'),
'end_time': self.end_time.strftime('%H:%M'),
}
def get_time_slots(self, min_datetime, max_datetime, meeting_type):
meeting_duration = datetime.timedelta(minutes=meeting_type.duration)
duration = datetime.timedelta(minutes=self.desk.agenda.get_base_meeting_duration())
min_datetime = make_naive(min_datetime)
max_datetime = make_naive(max_datetime)
real_min_datetime = min_datetime + datetime.timedelta(
days=self.weekday - min_datetime.weekday())
if real_min_datetime < min_datetime:
real_min_datetime += datetime.timedelta(days=7)
event_datetime = real_min_datetime.replace(hour=self.start_time.hour,
minute=self.start_time.minute, second=0, microsecond=0)
while event_datetime < max_datetime:
end_time = event_datetime + meeting_duration
next_time = event_datetime + duration
if end_time.time() > self.end_time or event_datetime.date() != next_time.date():
# back to morning
event_datetime = event_datetime.replace(hour=self.start_time.hour, minute=self.start_time.minute)
# but next week
event_datetime += datetime.timedelta(days=7)
next_time = event_datetime + duration
if event_datetime > max_datetime:
break
yield TimeSlot(start_datetime=make_aware(event_datetime), meeting_type=meeting_type, desk=self.desk)
event_datetime = next_time
class MeetingType(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
duration = models.IntegerField(_('Duration (in minutes)'), default=30)
class Meta:
ordering = ['duration', 'label']
unique_together = ['agenda', 'slug']
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self, agenda=self.agenda)
super(MeetingType, self).save(*args, **kwargs)
@classmethod
def import_json(cls, data):
meeting_type, created = cls.objects.get_or_create(
slug=data['slug'], agenda=data['agenda'], defaults=data)
if not created:
for k, v in data.items():
setattr(meeting_type, k, v)
return meeting_type
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'duration': self.duration,
}
@python_2_unicode_compatible
class Event(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
start_datetime = models.DateTimeField(_('Date/time'))
places = models.PositiveIntegerField(_('Places'))
waiting_list_places = models.PositiveIntegerField(
_('Places in waiting list'), default=0)
label = models.CharField(_('Label'), max_length=150, null=True, blank=True,
help_text=_('Optional label to identify this date.'))
slug = models.SlugField(_('Identifier'), max_length=160, null=True, blank=True, default=None)
description = models.TextField(_('Description'), null=True, blank=True,
help_text=_('Optional event description.'))
full = models.BooleanField(default=False)
meeting_type = models.ForeignKey(MeetingType, null=True, on_delete=models.CASCADE)
desk = models.ForeignKey('Desk', null=True, on_delete=models.CASCADE)
class Meta:
ordering = ['agenda', 'start_datetime', 'label']
unique_together = ('agenda', 'slug')
def __str__(self):
if self.label:
return self.label
return date_format(localtime(self.start_datetime), format='DATETIME_FORMAT')
def save(self, *args, **kwargs):
self.check_full()
return super(Event, self).save(*args, **kwargs)
def check_full(self):
self.full = bool(
(self.booked_places >= self.places and self.waiting_list_places == 0) or
(self.waiting_list_places and self.waiting_list >= self.waiting_list_places))
def in_bookable_period(self):
if localtime(now()).date() > localtime(self.start_datetime - datetime.timedelta(days=self.agenda.minimal_booking_delay)).date():
return False
if self.agenda.maximal_booking_delay and (
localtime(now()).date() <= localtime(self.start_datetime - datetime.timedelta(days=self.agenda.maximal_booking_delay)).date()):
return False
if self.start_datetime < now():
# past the event date, we may want in the future to allow for some
# extra late booking but it's forbidden for now.
return False
return True
@property
def booked_places(self):
return self.booking_set.filter(cancellation_datetime__isnull=True,
in_waiting_list=False).count()
@property
def waiting_list(self):
return self.booking_set.filter(cancellation_datetime__isnull=True,
in_waiting_list=True).count()
@property
def end_datetime(self):
return self.start_datetime + datetime.timedelta(minutes=self.meeting_type.duration)
def get_absolute_url(self):
return reverse('chrono-manager-event-edit', kwargs={'pk': self.id})
@classmethod
def import_json(cls, data):
data['start_datetime'] = make_aware(datetime.datetime.strptime(
data['start_datetime'], '%Y-%m-%d %H:%M:%S'))
if data.get('slug'):
event, created = cls.objects.get_or_create(slug=data['slug'], defaults=data)
if not created:
for k, v in data.items():
setattr(event, k, v)
return event
return cls(**data)
def export_json(self):
return {
'start_datetime': make_naive(self.start_datetime).strftime('%Y-%m-%d %H:%M:%S'),
'places': self.places,
'waiting_list_places': self.waiting_list_places,
'label': self.label,
'slug': self.slug,
'description': self.description,
}
class Booking(models.Model):
event = models.ForeignKey(Event, on_delete=models.CASCADE)
extra_data = JSONField(null=True)
cancellation_datetime = models.DateTimeField(null=True)
in_waiting_list = models.BooleanField(default=False)
creation_datetime = models.DateTimeField(auto_now_add=True)
# primary booking is used to group multiple bookings together
primary_booking = models.ForeignKey('self', null=True,
on_delete=models.CASCADE, related_name='secondary_booking_set')
label = models.CharField(max_length=250, blank=True)
user_display_label = models.CharField(verbose_name=_('Label displayed to user'), max_length=250, blank=True)
user_name = models.CharField(max_length=250, blank=True)
backoffice_url = models.URLField(blank=True)
def save(self, *args, **kwargs):
with transaction.atomic():
super(Booking, self).save(*args, **kwargs)
initial_value = self.event.full
self.event.check_full()
if self.event.full != initial_value:
self.event.save()
def cancel(self):
timestamp = now()
with transaction.atomic():
self.secondary_booking_set.update(cancellation_datetime=timestamp)
self.cancellation_datetime = timestamp
self.save()
def accept(self):
self.in_waiting_list = False
with transaction.atomic():
self.secondary_booking_set.update(in_waiting_list=False)
self.save()
def get_ics(self, request=None):
ics = vobject.iCalendar()
ics.add('prodid').value = '-//Entr\'ouvert//NON SGML Publik'
vevent = vobject.newFromBehavior('vevent')
vevent.add('uid').value = '%s-%s-%s' % (self.event.start_datetime.isoformat(), self.event.agenda.pk, self.pk)
vevent.add('summary').value = self.user_display_label or self.label
vevent.add('dtstart').value = self.event.start_datetime
if self.user_name:
vevent.add('attendee').value = self.user_name
if self.event.meeting_type:
vevent.add('dtend').value = self.event.start_datetime + datetime.timedelta(minutes=self.event.meeting_type.duration)
for field in ('description', 'location', 'comment', 'url'):
field_value = request and request.GET.get(field) or self.extra_data.get(field)
if field_value:
vevent.add(field).value = field_value
ics.add(vevent)
return ics.serialize()
@python_2_unicode_compatible
class Desk(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
timeperiod_exceptions_remote_url = models.URLField(
_('URL to fetch time period exceptions from'),
blank=True, max_length=500)
def __str__(self):
return self.label
class Meta:
ordering = ['label']
unique_together = ['agenda', 'slug']
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self, agenda=self.agenda)
super(Desk, self).save(*args, **kwargs)
@classmethod
def import_json(cls, data):
timeperiods = data.pop('timeperiods')
exceptions = data.pop('exceptions')
instance, created = cls.objects.get_or_create(
slug=data['slug'], agenda=data['agenda'], defaults=data)
if not created:
for k, v in data.items():
setattr(instance, k, v)
for timeperiod in timeperiods:
timeperiod['desk'] = instance
TimePeriod.import_json(timeperiod).save()
for exception in exceptions:
exception['desk'] = instance
TimePeriodException.import_json(exception).save()
return instance
def export_json(self):
return {'label': self.label,
'slug': self.slug,
'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.all()],
'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()]
}
def get_exceptions_within_two_weeks(self):
in_two_weeks = make_aware(datetime.datetime.today() + datetime.timedelta(days=14))
exceptions = self.timeperiodexception_set.filter(end_datetime__gte=now()).filter(
Q(end_datetime__lte=in_two_weeks) | Q(start_datetime__lt=now()))
if exceptions.exists():
return exceptions
# if none found within the 2 coming weeks, return the next one
next_exception = self.timeperiodexception_set.filter(
start_datetime__gte=now()).order_by('start_datetime').first()
if next_exception:
return [next_exception]
return []
def are_all_exceptions_displayed(self):
in_two_weeks = self.get_exceptions_within_two_weeks()
return self.timeperiodexception_set.count() == len(in_two_weeks)
def create_timeperiod_exceptions_from_remote_ics(self, url):
try:
response = requests.get(url, proxies=settings.REQUESTS_PROXIES)
response.raise_for_status()
except requests.HTTPError as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).') %
{'url': url, 'status_code': e.response.status_code})
except requests.RequestException as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).') %
{'url': url, 'exception': e})
return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
def remove_timeperiod_exceptions_from_remote_ics(self):
TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete()
def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False, recurring_days=600):
try:
parsed = vobject.readOne(data)
except vobject.base.ParseError:
raise ICSError(_('File format is invalid.'))
total_created = 0
if not parsed.contents.get('vevent') and not keep_synced_by_uid:
raise ICSError(_('The file doesn\'t contain any events.'))
with transaction.atomic():
update_datetime = now()
for vevent in parsed.contents.get('vevent', []):
if 'summary' in vevent.contents:
summary = force_text(vevent.contents['summary'][0].value)
else:
summary = _('Exception')
try:
start_dt = vevent.dtstart.value
if not isinstance(start_dt, datetime.datetime):
start_dt = datetime.datetime.combine(start_dt, datetime.datetime.min.time())
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
except AttributeError:
raise ICSError(_('Event "%s" has no start date.') % summary)
try:
end_dt = vevent.dtend.value
if not isinstance(end_dt, datetime.datetime):
end_dt = datetime.datetime.combine(end_dt, datetime.datetime.min.time())
if not is_aware(end_dt):
end_dt = make_aware(end_dt)
duration = end_dt - start_dt
except AttributeError:
try:
duration = vevent.duration.value
end_dt = start_dt + duration
except AttributeError:
# events without end date are considered as ending the same day
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
duration = end_dt - start_dt
event = {}
event['start_datetime'] = start_dt
event['end_datetime'] = end_dt
event['label'] = summary
kwargs = {}
kwargs['desk'] = self
kwargs['recurrence_id'] = 0
if keep_synced_by_uid:
kwargs['external_id'] = vevent.contents['uid'][0].value
else:
kwargs['label'] = summary
if not vevent.rruleset:
# classical event
obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
if created:
total_created += 1
elif vevent.rruleset.count():
# recurring event until recurring_days in the future
from_dt = start_dt
until_dt = update_datetime + datetime.timedelta(days=recurring_days)
if not is_aware(vevent.rruleset[0]):
from_dt = make_naive(from_dt)
until_dt = make_naive(until_dt)
i = -1
for i, start_dt in enumerate(vevent.rruleset.between(from_dt, until_dt, inc=True)):
# recompute start_dt and end_dt from occurrences and duration
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
end_dt = start_dt + duration
kwargs['recurrence_id'] = i
event['start_datetime'] = start_dt
event['end_datetime'] = end_dt
if end_dt < update_datetime:
TimePeriodException.objects.filter(**kwargs).update(**event)
else:
obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
if created:
total_created += 1
# delete unseen occurrences
kwargs.pop('recurrence_id', None)
TimePeriodException.objects.filter(recurrence_id__gt=i, **kwargs).delete()
if keep_synced_by_uid:
# delete all outdated exceptions from remote calendar
TimePeriodException.objects.filter(update_datetime__lt=update_datetime,
desk=self).exclude(external_id='').delete()
return total_created
def get_opening_hours(self, date):
openslots = Intervals()
for timeperiod in self.timeperiod_set.filter(weekday=date.weekday()):
start_datetime = make_aware(datetime.datetime.combine(date, timeperiod.start_time))
end_datetime = make_aware(datetime.datetime.combine(date, timeperiod.end_time))
openslots.add(start_datetime, end_datetime)
aware_date = make_aware(datetime.datetime(date.year, date.month, date.day))
aware_next_date = aware_date + datetime.timedelta(days=1)
for exception in self.timeperiodexception_set.filter(
start_datetime__lt=aware_next_date,
end_datetime__gt=aware_date):
openslots.remove(exception.start_datetime, exception.end_datetime)
return openslots.search(aware_date, aware_next_date)
@python_2_unicode_compatible
class TimePeriodException(models.Model):
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
external_id = models.CharField(_('External ID'), max_length=256, blank=True)
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
start_datetime = models.DateTimeField(_('Exception start time'))
end_datetime = models.DateTimeField(_('Exception end time'))
update_datetime = models.DateTimeField(auto_now=True)
recurrence_id = models.PositiveIntegerField(_('Recurrence ID'), default=0)
class Meta:
ordering = ['start_datetime']
def __str__(self):
if is_midnight(self.start_datetime) and is_midnight(self.end_datetime):
# if both dates are at midnight don't include the time part
if self.end_datetime == self.start_datetime + datetime.timedelta(days=1):
# a single day
exc_repr = u'%s' % date_format(localtime(self.start_datetime), 'SHORT_DATE_FORMAT')
else:
exc_repr = u'%s%s' % (
date_format(localtime(self.start_datetime), 'SHORT_DATE_FORMAT'),
date_format(localtime(self.end_datetime), 'SHORT_DATE_FORMAT'))
else:
if localtime(self.start_datetime).date() == localtime(self.end_datetime).date():
# same day
exc_repr = u'%s%s' % (
date_format(localtime(self.start_datetime), 'SHORT_DATETIME_FORMAT'),
date_format(localtime(self.end_datetime), 'TIME_FORMAT'))
else:
exc_repr = u'%s%s' % (
date_format(localtime(self.start_datetime), 'SHORT_DATETIME_FORMAT'),
date_format(localtime(self.end_datetime), 'SHORT_DATETIME_FORMAT'))
if self.label:
exc_repr = u'%s (%s)' % (self.label, exc_repr)
return exc_repr
def clean(self):
super(TimePeriodException, self).clean()
if self.has_booking_within_time_slot():
raise ValidationError(_('One or several bookings exists within this time slot.'))
def has_booking_within_time_slot(self):
if not (self.start_datetime and self.end_datetime):
# incomplete time period, can't tell
return False
for event in Event.objects.filter(
desk=self.desk,
booking__isnull=False,
booking__cancellation_datetime__isnull=True):
if self.start_datetime <= event.start_datetime < self.end_datetime:
return True
return False
@classmethod
def import_json(cls, data):
def import_datetime(s):
'''Import datetime as a naive ISO8601 serialization'''
return make_aware(datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S'))
for k, v in data.items():
if k.endswith('_datetime'):
data[k] = import_datetime(v)
return cls(**data)
def export_json(self):
def export_datetime(dt):
'''Export datetime as a naive ISO8601 serialization'''
return make_naive(dt).strftime('%Y-%m-%d %H:%M:%S')
return {
'label': self.label,
'start_datetime': export_datetime(self.start_datetime),
'end_datetime': export_datetime(self.end_datetime),
'external_id': self.external_id,
'recurrence_id': self.recurrence_id,
'update_datetime': export_datetime(self.update_datetime),
}