chrono/chrono/agendas/models.py

693 lines
28 KiB
Python

# -*- coding: utf-8 -*-
# chrono - agendas system
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import fractions
import requests
import vobject
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.db import models, transaction
from django.db.models import Q
from django.utils.dates import WEEKDAYS
from django.utils.encoding import force_text, python_2_unicode_compatible
from django.utils.formats import date_format
from django.utils.text import slugify
from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
from django.utils.translation import ugettext_lazy as _
from jsonfield import JSONField
from ..interval import Intervals
AGENDA_KINDS = (
('events', _('Events')),
('meetings', _('Meetings')),
)
def is_midnight(dtime):
dtime = localtime(dtime)
return dtime.hour == 0 and dtime.minute == 0
class ICSError(Exception):
pass
class AgendaImportError(Exception):
pass
class Agenda(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
kind = models.CharField(_('Kind'), max_length=20, choices=AGENDA_KINDS, default='events')
minimal_booking_delay = models.PositiveIntegerField(
_('Minimal booking delay (in days)'), default=1)
maximal_booking_delay = models.PositiveIntegerField(
_('Maximal booking delay (in days)'), default=56) # eight weeks
edit_role = models.ForeignKey(Group, blank=True, null=True, default=None,
related_name='+', verbose_name=_('Edit Role'),
on_delete=models.SET_NULL)
view_role = models.ForeignKey(Group, blank=True, null=True, default=None,
related_name='+', verbose_name=_('View Role'),
on_delete=models.SET_NULL)
class Meta:
ordering = ['label']
def save(self, *args, **kwargs):
if not self.slug:
base_slug = slugify(self.label)
slug = base_slug
i = 1
while True:
try:
Agenda.objects.get(slug=slug)
except self.DoesNotExist:
break
slug = '%s-%s' % (base_slug, i)
i += 1
self.slug = slug
super(Agenda, self).save(*args, **kwargs)
def get_absolute_url(self):
return reverse('chrono-manager-agenda-view', kwargs={'pk': self.id})
def can_be_managed(self, user):
if user.is_staff:
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.edit_role_id in group_ids)
def can_be_viewed(self, user):
if self.can_be_managed(user):
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.view_role_id in group_ids)
def get_base_meeting_duration(self):
durations = [x.duration for x in MeetingType.objects.filter(agenda=self)]
if not durations:
raise ValueError()
gcd = durations[0]
for duration in durations[1:]:
gcd = fractions.gcd(duration, gcd)
return gcd
def export_json(self):
agenda = {
'label': self.label,
'slug': self.slug,
'kind': self.kind,
'minimal_booking_delay': self.minimal_booking_delay,
'maximal_booking_delay': self.maximal_booking_delay,
'permissions': {
'view': self.view_role.name if self.view_role else None,
'edit': self.edit_role.name if self.edit_role else None,
}
}
if self.kind == 'events':
agenda['events'] = [x.export_json() for x in self.event_set.all()]
elif self.kind == 'meetings':
agenda['meetingtypes'] = [x.export_json() for x in self.meetingtype_set.all()]
agenda['desks'] = [desk.export_json() for desk in self.desk_set.all()]
return agenda
@classmethod
def import_json(cls, data, overwrite=False):
data = data.copy()
permissions = data.pop('permissions') or {}
if data['kind'] == 'events':
events = data.pop('events')
elif data['kind'] == 'meetings':
meetingtypes = data.pop('meetingtypes')
desks = data.pop('desks')
for permission in ('view', 'edit'):
if permissions.get(permission):
try:
data[permission + '_role'] = Group.objects.get(name=permissions[permission])
except Group.DoesNotExist:
raise AgendaImportError(_('Missing "%s" role') % permissions[permission])
agenda, created = cls.objects.get_or_create(slug=data['slug'], defaults=data)
if data['kind'] == 'events':
if overwrite:
Event.objects.filter(agenda=agenda).delete()
for event_data in events:
event_data['agenda'] = agenda
Event.import_json(event_data).save()
elif data['kind'] == 'meetings':
if overwrite:
MeetingType.objects.filter(agenda=agenda).delete()
Desk.objects.filter(agenda=agenda).delete()
for type_data in meetingtypes:
type_data['agenda'] = agenda
MeetingType.import_json(type_data).save()
for desk in desks:
desk['agenda'] = agenda
Desk.import_json(desk).save()
return created
WEEKDAYS_LIST = sorted(WEEKDAYS.items(), key=lambda x: x[0])
@python_2_unicode_compatible
class TimeSlot(object):
def __init__(self, start_datetime, meeting_type, desk):
self.start_datetime = start_datetime
self.end_datetime = start_datetime + datetime.timedelta(minutes=meeting_type.duration)
self.meeting_type = meeting_type
self.id = '%s:%s' % (self.meeting_type.id, start_datetime.strftime('%Y-%m-%d-%H%M'))
self.desk = desk
def __str__(self):
return date_format(self.start_datetime, format='DATETIME_FORMAT')
@python_2_unicode_compatible
class TimePeriod(models.Model):
weekday = models.IntegerField(_('Week day'), choices=WEEKDAYS_LIST)
start_time = models.TimeField(_('Start'))
end_time = models.TimeField(_('End'))
desk = models.ForeignKey('Desk', on_delete=models.CASCADE)
class Meta:
ordering = ['weekday', 'start_time']
def __str__(self):
return u'%s / %s%s' % (
force_text(WEEKDAYS[self.weekday]),
date_format(self.start_time, 'TIME_FORMAT'),
date_format(self.end_time, 'TIME_FORMAT'))
@property
def weekday_str(self):
return WEEKDAYS[self.weekday]
@classmethod
def import_json(cls, data):
return cls(**data)
def export_json(self):
return {
'weekday': self.weekday,
'start_time': self.start_time.strftime('%H:%M'),
'end_time': self.end_time.strftime('%H:%M'),
}
def get_time_slots(self, min_datetime, max_datetime, meeting_type):
meeting_duration = datetime.timedelta(minutes=meeting_type.duration)
duration = datetime.timedelta(minutes=self.desk.agenda.get_base_meeting_duration())
min_datetime = make_naive(min_datetime)
max_datetime = make_naive(max_datetime)
real_min_datetime = min_datetime + datetime.timedelta(
days=self.weekday - min_datetime.weekday())
if real_min_datetime < min_datetime:
real_min_datetime += datetime.timedelta(days=7)
event_datetime = real_min_datetime.replace(hour=self.start_time.hour,
minute=self.start_time.minute, second=0, microsecond=0)
while event_datetime < max_datetime:
end_time = event_datetime + meeting_duration
next_time = event_datetime + duration
if end_time.time() > self.end_time or event_datetime.date() != next_time.date():
# back to morning
event_datetime = event_datetime.replace(hour=self.start_time.hour, minute=self.start_time.minute)
# but next week
event_datetime += datetime.timedelta(days=7)
next_time = event_datetime + duration
if event_datetime > max_datetime:
break
yield TimeSlot(start_datetime=make_aware(event_datetime), meeting_type=meeting_type, desk=self.desk)
event_datetime = next_time
class MeetingType(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
duration = models.IntegerField(_('Duration (in minutes)'), default=30)
class Meta:
ordering = ['duration', 'label']
def save(self, *args, **kwargs):
if not self.slug:
base_slug = slugify(self.label)
slug = base_slug
i = 1
while True:
try:
MeetingType.objects.get(slug=slug, agenda=self.agenda)
except self.DoesNotExist:
break
slug = '%s-%s' % (base_slug, i)
i += 1
self.slug = slug
super(MeetingType, self).save(*args, **kwargs)
@classmethod
def import_json(cls, data):
return cls(**data)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'duration': self.duration,
}
@python_2_unicode_compatible
class Event(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
start_datetime = models.DateTimeField(_('Date/time'))
places = models.PositiveIntegerField(_('Places'))
waiting_list_places = models.PositiveIntegerField(
_('Places in waiting list'), default=0)
label = models.CharField(_('Label'), max_length=150, null=True, blank=True,
help_text=_('Optional label to identify this date.'))
description = models.TextField(_('Description'), null=True, blank=True,
help_text=_('Optional event description.'))
full = models.BooleanField(default=False)
meeting_type = models.ForeignKey(MeetingType, null=True, on_delete=models.CASCADE)
desk = models.ForeignKey('Desk', null=True, on_delete=models.CASCADE)
class Meta:
ordering = ['agenda', 'start_datetime', 'label']
def __str__(self):
if self.label:
return self.label
return date_format(localtime(self.start_datetime), format='DATETIME_FORMAT')
def save(self, *args, **kwargs):
self.check_full()
return super(Event, self).save(*args, **kwargs)
def check_full(self):
self.full = bool(
(self.booked_places >= self.places and self.waiting_list_places == 0) or
(self.waiting_list_places and self.waiting_list >= self.waiting_list_places))
def in_bookable_period(self):
if localtime(now()).date() > localtime(self.start_datetime - datetime.timedelta(days=self.agenda.minimal_booking_delay)).date():
return False
if self.agenda.maximal_booking_delay and (
localtime(now()).date() <= localtime(self.start_datetime - datetime.timedelta(days=self.agenda.maximal_booking_delay)).date()):
return False
if self.start_datetime < now():
# past the event date, we may want in the future to allow for some
# extra late booking but it's forbidden for now.
return False
return True
@property
def booked_places(self):
return self.booking_set.filter(cancellation_datetime__isnull=True,
in_waiting_list=False).count()
@property
def waiting_list(self):
return self.booking_set.filter(cancellation_datetime__isnull=True,
in_waiting_list=True).count()
@property
def end_datetime(self):
return self.start_datetime + datetime.timedelta(minutes=self.meeting_type.duration)
def get_absolute_url(self):
return reverse('chrono-manager-event-edit', kwargs={'pk': self.id})
@classmethod
def import_json(cls, data):
data['start_datetime'] = make_aware(datetime.datetime.strptime(
data['start_datetime'], '%Y-%m-%d %H:%M:%S'))
return cls(**data)
def export_json(self):
return {
'start_datetime': make_naive(self.start_datetime).strftime('%Y-%m-%d %H:%M:%S'),
'places': self.places,
'waiting_list_places': self.waiting_list_places,
'label': self.label,
'description': self.description,
}
class Booking(models.Model):
event = models.ForeignKey(Event, on_delete=models.CASCADE)
extra_data = JSONField(null=True)
cancellation_datetime = models.DateTimeField(null=True)
in_waiting_list = models.BooleanField(default=False)
creation_datetime = models.DateTimeField(auto_now_add=True)
# primary booking is used to group multiple bookings together
primary_booking = models.ForeignKey('self', null=True,
on_delete=models.CASCADE, related_name='secondary_booking_set')
label = models.CharField(max_length=250, blank=True)
user_display_label = models.CharField(verbose_name=_('Label displayed to user'), max_length=250, blank=True)
user_name = models.CharField(max_length=250, blank=True)
backoffice_url = models.URLField(blank=True)
def save(self, *args, **kwargs):
with transaction.atomic():
super(Booking, self).save(*args, **kwargs)
initial_value = self.event.full
self.event.check_full()
if self.event.full != initial_value:
self.event.save()
def cancel(self):
timestamp = now()
with transaction.atomic():
self.secondary_booking_set.update(cancellation_datetime=timestamp)
self.cancellation_datetime = timestamp
self.save()
def accept(self):
self.in_waiting_list = False
with transaction.atomic():
self.secondary_booking_set.update(in_waiting_list=False)
self.save()
def get_ics(self, request=None):
ics = vobject.iCalendar()
ics.add('prodid').value = '-//Entr\'ouvert//NON SGML Publik'
vevent = vobject.newFromBehavior('vevent')
vevent.add('uid').value = '%s-%s-%s' % (self.event.start_datetime.isoformat(), self.event.agenda.pk, self.pk)
vevent.add('summary').value = self.user_display_label or self.label
vevent.add('dtstart').value = self.event.start_datetime
if self.user_name:
vevent.add('attendee').value = self.user_name
if self.event.meeting_type:
vevent.add('dtend').value = self.event.start_datetime + datetime.timedelta(minutes=self.event.meeting_type.duration)
for field in ('description', 'location', 'comment', 'url'):
field_value = request and request.GET.get(field) or self.extra_data.get(field)
if field_value:
vevent.add(field).value = field_value
ics.add(vevent)
return ics.serialize()
@python_2_unicode_compatible
class Desk(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
timeperiod_exceptions_remote_url = models.URLField(
_('URL to fetch time period exceptions from'),
blank=True, max_length=500)
def __str__(self):
return self.label
class Meta:
ordering = ['label']
def save(self, *args, **kwargs):
if not self.slug:
base_slug = slugify(self.label)
slug = base_slug
i = 1
while True:
try:
Desk.objects.get(slug=slug, agenda=self.agenda)
except self.DoesNotExist:
break
slug = '%s-%s' % (base_slug, i)
i += 1
self.slug = slug
super(Desk, self).save(*args, **kwargs)
@classmethod
def import_json(cls, data):
timeperiods = data.pop('timeperiods')
exceptions = data.pop('exceptions')
instance, created = cls.objects.get_or_create(**data)
for timeperiod in timeperiods:
timeperiod['desk'] = instance
TimePeriod.import_json(timeperiod).save()
for exception in exceptions:
exception['desk'] = instance
TimePeriodException.import_json(exception).save()
return instance
def export_json(self):
return {'label': self.label,
'slug': self.slug,
'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.all()],
'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()]
}
def get_exceptions_within_two_weeks(self):
in_two_weeks = make_aware(datetime.datetime.today() + datetime.timedelta(days=14))
exceptions = self.timeperiodexception_set.filter(end_datetime__gte=now()).filter(
Q(end_datetime__lte=in_two_weeks) | Q(start_datetime__lt=now()))
if exceptions.exists():
return exceptions
# if none found within the 2 coming weeks, return the next one
next_exception = self.timeperiodexception_set.filter(
start_datetime__gte=now()).order_by('start_datetime').first()
if next_exception:
return [next_exception]
return []
def are_all_exceptions_displayed(self):
in_two_weeks = self.get_exceptions_within_two_weeks()
return self.timeperiodexception_set.count() == len(in_two_weeks)
def create_timeperiod_exceptions_from_remote_ics(self, url):
try:
response = requests.get(url, proxies=settings.REQUESTS_PROXIES)
response.raise_for_status()
except requests.HTTPError as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).') %
{'url': url, 'status_code': e.response.status_code})
except requests.RequestException as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).') %
{'url': url, 'exception': e})
return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
def remove_timeperiod_exceptions_from_remote_ics(self):
TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete()
def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False, recurring_days=600):
try:
parsed = vobject.readOne(data)
except vobject.base.ParseError:
raise ICSError(_('File format is invalid.'))
total_created = 0
if not parsed.contents.get('vevent') and not keep_synced_by_uid:
raise ICSError(_('The file doesn\'t contain any events.'))
with transaction.atomic():
update_datetime = now()
for vevent in parsed.contents.get('vevent', []):
if 'summary' in vevent.contents:
summary = force_text(vevent.contents['summary'][0].value)
else:
summary = _('Exception')
try:
start_dt = vevent.dtstart.value
if not isinstance(start_dt, datetime.datetime):
start_dt = datetime.datetime.combine(start_dt, datetime.datetime.min.time())
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
except AttributeError:
raise ICSError(_('Event "%s" has no start date.') % summary)
try:
end_dt = vevent.dtend.value
if not isinstance(end_dt, datetime.datetime):
end_dt = datetime.datetime.combine(end_dt, datetime.datetime.min.time())
if not is_aware(end_dt):
end_dt = make_aware(end_dt)
duration = end_dt - start_dt
except AttributeError:
try:
duration = vevent.duration.value
end_dt = start_dt + duration
except AttributeError:
# events without end date are considered as ending the same day
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
duration = end_dt - start_dt
event = {}
event['start_datetime'] = start_dt
event['end_datetime'] = end_dt
event['label'] = summary
kwargs = {}
kwargs['desk'] = self
kwargs['recurrence_id'] = 0
if keep_synced_by_uid:
kwargs['external_id'] = vevent.contents['uid'][0].value
else:
kwargs['label'] = summary
if not vevent.rruleset:
# classical event
obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
if created:
total_created += 1
elif vevent.rruleset.count():
# recurring event until recurring_days in the future
from_dt = start_dt
until_dt = update_datetime + datetime.timedelta(days=recurring_days)
if not is_aware(vevent.rruleset[0]):
from_dt = make_naive(from_dt)
until_dt = make_naive(until_dt)
i = -1
for i, start_dt in enumerate(vevent.rruleset.between(from_dt, until_dt, inc=True)):
# recompute start_dt and end_dt from occurrences and duration
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
end_dt = start_dt + duration
kwargs['recurrence_id'] = i
event['start_datetime'] = start_dt
event['end_datetime'] = end_dt
if end_dt < update_datetime:
TimePeriodException.objects.filter(**kwargs).update(**event)
else:
obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
if created:
total_created += 1
# delete unseen occurrences
kwargs.pop('recurrence_id', None)
TimePeriodException.objects.filter(recurrence_id__gt=i, **kwargs).delete()
if keep_synced_by_uid:
# delete all outdated exceptions from remote calendar
TimePeriodException.objects.filter(update_datetime__lt=update_datetime,
desk=self).exclude(external_id='').delete()
return total_created
def get_opening_hours(self, date):
openslots = Intervals()
for timeperiod in self.timeperiod_set.filter(weekday=date.weekday()):
start_datetime = make_aware(datetime.datetime.combine(date, timeperiod.start_time))
end_datetime = make_aware(datetime.datetime.combine(date, timeperiod.end_time))
openslots.add(start_datetime, end_datetime)
aware_date = make_aware(datetime.datetime(date.year, date.month, date.day))
aware_next_date = aware_date + datetime.timedelta(days=1)
for exception in self.timeperiodexception_set.filter(
start_datetime__lt=aware_next_date,
end_datetime__gt=aware_date):
openslots.remove(exception.start_datetime, exception.end_datetime)
return openslots.search(aware_date, aware_next_date)
@python_2_unicode_compatible
class TimePeriodException(models.Model):
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
external_id = models.CharField(_('External ID'), max_length=256, blank=True)
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
start_datetime = models.DateTimeField(_('Exception start time'))
end_datetime = models.DateTimeField(_('Exception end time'))
update_datetime = models.DateTimeField(auto_now=True)
recurrence_id = models.PositiveIntegerField(_('Recurrence ID'), default=0)
class Meta:
ordering = ['start_datetime']
def __str__(self):
if is_midnight(self.start_datetime) and is_midnight(self.end_datetime):
# if both dates are at midnight don't include the time part
if self.end_datetime == self.start_datetime + datetime.timedelta(days=1):
# a single day
exc_repr = u'%s' % date_format(localtime(self.start_datetime), 'SHORT_DATE_FORMAT')
else:
exc_repr = u'%s%s' % (
date_format(localtime(self.start_datetime), 'SHORT_DATE_FORMAT'),
date_format(localtime(self.end_datetime), 'SHORT_DATE_FORMAT'))
else:
if localtime(self.start_datetime).date() == localtime(self.end_datetime).date():
# same day
exc_repr = u'%s%s' % (
date_format(localtime(self.start_datetime), 'SHORT_DATETIME_FORMAT'),
date_format(localtime(self.end_datetime), 'TIME_FORMAT'))
else:
exc_repr = u'%s%s' % (
date_format(localtime(self.start_datetime), 'SHORT_DATETIME_FORMAT'),
date_format(localtime(self.end_datetime), 'SHORT_DATETIME_FORMAT'))
if self.label:
exc_repr = u'%s (%s)' % (self.label, exc_repr)
return exc_repr
def clean(self):
super(TimePeriodException, self).clean()
if self.has_booking_within_time_slot():
raise ValidationError(_('One or several bookings exists within this time slot.'))
def has_booking_within_time_slot(self):
if not (self.start_datetime and self.end_datetime):
# incomplete time period, can't tell
return False
for event in Event.objects.filter(
desk=self.desk,
booking__isnull=False,
booking__cancellation_datetime__isnull=True):
if self.start_datetime <= event.start_datetime < self.end_datetime:
return True
return False
@classmethod
def import_json(cls, data):
def import_datetime(s):
'''Import datetime as a naive ISO8601 serialization'''
return make_aware(datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S'))
for k, v in data.items():
if k.endswith('_datetime'):
data[k] = import_datetime(v)
return cls(**data)
def export_json(self):
def export_datetime(dt):
'''Export datetime as a naive ISO8601 serialization'''
return make_naive(dt).strftime('%Y-%m-%d %H:%M:%S')
return {
'label': self.label,
'start_datetime': export_datetime(self.start_datetime),
'end_datetime': export_datetime(self.end_datetime),
'external_id': self.external_id,
'recurrence_id': self.recurrence_id,
'update_datetime': export_datetime(self.update_datetime),
}