chrono/chrono/agendas/models.py

505 lines
19 KiB
Python

# -*- coding: utf-8 -*-
# chrono - agendas system
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import vobject
from django.contrib.auth.models import Group
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.db import models
from django.db import transaction
from django.utils.dateformat import DateFormat
from django.utils.dates import WEEKDAYS
from django.utils.encoding import force_text
from django.utils.formats import date_format, get_format
from django.utils.text import slugify
from django.utils.timezone import localtime, now, make_aware
from django.utils.translation import ugettext_lazy as _
from jsonfield import JSONField
AGENDA_KINDS = (
('events', _('Events')),
('meetings', _('Meetings')),
)
def is_midnight(dtime):
dtime = localtime(dtime)
return dtime.hour == 0 and dtime.minute == 0
class ICSError(Exception):
pass
class Agenda(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'))
kind = models.CharField(_('Kind'), max_length=20, choices=AGENDA_KINDS, default='events')
minimal_booking_delay = models.PositiveIntegerField(
_('Minimal booking delay (in days)'), default=1)
maximal_booking_delay = models.PositiveIntegerField(
_('Maximal booking delay (in days)'), default=56) # eight weeks
edit_role = models.ForeignKey(Group, blank=True, null=True, default=None,
related_name='+', verbose_name=_('Edit Role'))
view_role = models.ForeignKey(Group, blank=True, null=True, default=None,
related_name='+', verbose_name=_('View Role'))
class Meta:
ordering = ['label']
def save(self, *args, **kwargs):
if not self.slug:
base_slug = slugify(self.label)
slug = base_slug
i = 1
while True:
try:
Agenda.objects.get(slug=slug)
except self.DoesNotExist:
break
slug = '%s-%s' % (base_slug, i)
i += 1
self.slug = slug
super(Agenda, self).save(*args, **kwargs)
def get_absolute_url(self):
return reverse('chrono-manager-agenda-view', kwargs={'pk': self.id})
def can_be_managed(self, user):
if user.is_staff:
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.edit_role_id in group_ids)
def can_be_viewed(self, user):
if self.can_be_managed(user):
return True
group_ids = [x.id for x in user.groups.all()]
return bool(self.view_role_id in group_ids)
def export_json(self):
agenda = {
'label': self.label,
'slug': self.slug,
'kind': self.kind,
'minimal_booking_delay': self.minimal_booking_delay,
'maximal_booking_delay': self.maximal_booking_delay,
'permissions': {
'view': self.view_role.name if self.view_role else None,
'edit': self.edit_role.name if self.edit_role else None,
}
}
if self.kind == 'events':
agenda['events'] = [x.export_json() for x in self.event_set.all()]
elif self.kind == 'meetings':
agenda['meetingtypes'] = [x.export_json() for x in self.meetingtype_set.all()]
agenda['desks'] = [desk.export_json() for desk in self.desk_set.all()]
return agenda
@classmethod
def import_json(cls, data, overwrite=False):
data = data.copy()
permissions = data.pop('permissions')
if data['kind'] == 'events':
events = data.pop('events')
elif data['kind'] == 'meetings':
meetingtypes = data.pop('meetingtypes')
desks = data.pop('desks')
agenda, created = cls.objects.get_or_create(slug=data['slug'], defaults=data)
if data['kind'] == 'events':
if overwrite:
Event.objects.filter(agenda=agenda).delete()
for event_data in events:
event_data['agenda'] = agenda
Event.import_json(event_data).save()
elif data['kind'] == 'meetings':
if overwrite:
MeetingType.objects.filter(agenda=agenda).delete()
Desk.objects.filter(agenda=agenda).delete()
for type_data in meetingtypes:
type_data['agenda'] = agenda
MeetingType.import_json(type_data).save()
for desk in desks:
desk['agenda'] = agenda
Desk.import_json(desk).save()
WEEKDAYS_LIST = sorted(WEEKDAYS.items(), key=lambda x: x[0])
class TimeSlot(object):
def __init__(self, start_datetime, meeting_type, desk):
self.start_datetime = start_datetime
self.end_datetime = start_datetime + datetime.timedelta(minutes=meeting_type.duration)
self.meeting_type = meeting_type
self.id = '%s:%s' % (self.meeting_type.id, start_datetime.strftime('%Y-%m-%d-%H%M'))
self.desk = desk
def __unicode__(self):
return date_format(self.start_datetime, format='DATETIME_FORMAT')
class TimePeriod(models.Model):
weekday = models.IntegerField(_('Week day'), choices=WEEKDAYS_LIST)
start_time = models.TimeField(_('Start'))
end_time = models.TimeField(_('End'))
desk = models.ForeignKey('Desk', on_delete=models.CASCADE)
class Meta:
ordering = ['weekday', 'start_time']
def __unicode__(self):
return u'%s / %s%s' % (
force_text(WEEKDAYS[self.weekday]),
date_format(self.start_time, 'TIME_FORMAT'),
date_format(self.end_time, 'TIME_FORMAT'))
@property
def weekday_str(self):
return WEEKDAYS[self.weekday]
@classmethod
def import_json(cls, data):
return cls(**data)
def export_json(self):
return {
'weekday': self.weekday,
'start_time': self.start_time.strftime('%H:%M'),
'end_time': self.end_time.strftime('%H:%M'),
}
def get_time_slots(self, min_datetime, max_datetime, meeting_type):
duration = datetime.timedelta(minutes=meeting_type.duration)
real_min_datetime = localtime(min_datetime) + datetime.timedelta(
days=self.weekday - min_datetime.weekday())
if real_min_datetime < min_datetime:
real_min_datetime += datetime.timedelta(days=7)
event_datetime = real_min_datetime.replace(hour=self.start_time.hour,
minute=self.start_time.minute, second=0, microsecond=0)
while event_datetime < max_datetime:
end_time = event_datetime + duration
if end_time.time() > self.end_time:
# back to morning
event_datetime = event_datetime.replace(hour=self.start_time.hour, minute=self.start_time.minute)
# but next week
event_datetime += datetime.timedelta(days=7)
end_time = event_datetime + duration
if event_datetime > max_datetime:
break
yield TimeSlot(start_datetime=event_datetime, meeting_type=meeting_type, desk=self.desk)
event_datetime = end_time
class MeetingType(models.Model):
agenda = models.ForeignKey(Agenda)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'))
duration = models.IntegerField(_('Duration (in minutes)'), default=30)
class Meta:
ordering = ['label']
def save(self, *args, **kwargs):
if not self.slug:
base_slug = slugify(self.label)
slug = base_slug
i = 1
while True:
try:
MeetingType.objects.get(slug=slug, agenda=self.agenda)
except self.DoesNotExist:
break
slug = '%s-%s' % (base_slug, i)
i += 1
self.slug = slug
super(MeetingType, self).save(*args, **kwargs)
@classmethod
def import_json(cls, data):
return cls(**data)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'duration': self.duration,
}
class Event(models.Model):
agenda = models.ForeignKey(Agenda)
start_datetime = models.DateTimeField(_('Date/time'))
places = models.PositiveIntegerField(_('Places'))
waiting_list_places = models.PositiveIntegerField(
_('Places in waiting list'), default=0)
label = models.CharField(_('Label'), max_length=150, null=True, blank=True,
help_text=_('Optional label to identify this date.'))
full = models.BooleanField(default=False)
meeting_type = models.ForeignKey(MeetingType, null=True)
desk = models.ForeignKey('Desk', null=True)
class Meta:
ordering = ['agenda', 'start_datetime', 'label']
def __unicode__(self):
if self.label:
return self.label
return date_format(localtime(self.start_datetime), format='DATETIME_FORMAT')
def save(self, *args, **kwargs):
self.check_full()
return super(Event, self).save(*args, **kwargs)
def check_full(self):
self.full = bool(
(self.booked_places >= self.places and self.waiting_list_places == 0) or
(self.waiting_list_places and self.waiting_list >= self.waiting_list_places))
def in_bookable_period(self):
if localtime(now()).date() > localtime(self.start_datetime - datetime.timedelta(days=self.agenda.minimal_booking_delay)).date():
return False
if self.agenda.maximal_booking_delay and (
localtime(now()).date() <= localtime(self.start_datetime - datetime.timedelta(days=self.agenda.maximal_booking_delay)).date()):
return False
if self.start_datetime < now():
# past the event date, we may want in the future to allow for some
# extra late booking but it's forbidden for now.
return False
return True
@property
def booked_places(self):
return self.booking_set.filter(cancellation_datetime__isnull=True,
in_waiting_list=False).count()
@property
def waiting_list(self):
return self.booking_set.filter(cancellation_datetime__isnull=True,
in_waiting_list=True).count()
@property
def end_datetime(self):
return self.start_datetime + datetime.timedelta(minutes=self.meeting_type.duration)
def get_absolute_url(self):
return reverse('chrono-manager-event-edit', kwargs={'pk': self.id})
@classmethod
def import_json(cls, data):
data['start_datetime'] = make_aware(datetime.datetime.strptime(
data['start_datetime'], '%Y-%m-%d %H:%M:%S'))
return cls(**data)
def export_json(self):
return {
'start_datetime': self.start_datetime.strftime('%Y-%m-%d %H:%M:%S'),
'places': self.places,
'waiting_list_places': self.waiting_list_places,
'label': self.label
}
class Booking(models.Model):
event = models.ForeignKey(Event)
extra_data = JSONField(null=True)
cancellation_datetime = models.DateTimeField(null=True)
in_waiting_list = models.BooleanField(default=False)
creation_datetime = models.DateTimeField(auto_now_add=True)
# primary booking is used to group multiple bookings together
primary_booking = models.ForeignKey('self', null=True,
on_delete=models.CASCADE, related_name='secondary_booking_set')
def save(self, *args, **kwargs):
with transaction.atomic():
super(Booking, self).save(*args, **kwargs)
initial_value = self.event.full
self.event.check_full()
if self.event.full != initial_value:
self.event.save()
def cancel(self):
timestamp = now()
with transaction.atomic():
self.secondary_booking_set.update(cancellation_datetime=timestamp)
self.cancellation_datetime = timestamp
self.save()
def accept(self):
self.in_waiting_list = False
with transaction.atomic():
self.secondary_booking_set.update(in_waiting_list=False)
self.save()
class Desk(models.Model):
agenda = models.ForeignKey(Agenda)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=150)
def __unicode__(self):
return self.label
class Meta:
ordering = ['label']
def save(self, *args, **kwargs):
if not self.slug:
base_slug = slugify(self.label)
slug = base_slug
i = 1
while True:
try:
Desk.objects.get(slug=slug, agenda=self.agenda)
except self.DoesNotExist:
break
slug = '%s-%s' % (base_slug, i)
i += 1
self.slug = slug
super(Desk, self).save(*args, **kwargs)
@classmethod
def import_json(cls, data):
timeperiods = data.pop('timeperiods')
exceptions = data.pop('exceptions')
instance, created = cls.objects.get_or_create(**data)
for timeperiod in timeperiods:
timeperiod['desk'] = instance
TimePeriod.import_json(timeperiod).save()
for exception in exceptions:
exception['desk'] = instance
TimePeriodException.import_json(exception).save()
return instance
def export_json(self):
return {'label': self.label,
'slug': self.slug,
'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.all()],
'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()]
}
def get_exceptions_within_two_weeks(self):
in_two_weeks = datetime.datetime.today() + datetime.timedelta(days=14)
exceptions = self.timeperiodexception_set.filter(
end_datetime__gte=now, end_datetime__lte=in_two_weeks)
if exceptions.exists():
return exceptions
# if none found within the 2 coming weeks, return the next one
next_exception = self.timeperiodexception_set.filter(
start_datetime__gte=now()).order_by('start_datetime').first()
if next_exception:
return [next_exception]
return []
def are_all_exceptions_displayed(self):
in_two_weeks = self.get_exceptions_within_two_weeks()
return self.timeperiodexception_set.count() == len(in_two_weeks)
def create_timeperiod_exceptions_from_ics(self, data):
try:
parsed = vobject.readOne(data)
except vobject.base.ParseError:
raise ICSError(_('File format is invalid.'))
total_created = 0
if not parsed.contents.get('vevent'):
raise ICSError(_('The file doesn\'t contain any events.'))
with transaction.atomic():
for vevent in parsed.contents['vevent']:
event = {}
summary = unicode(vevent.contents['summary'][0].value, 'utf-8')
if 'rrule' in vevent.contents:
raise ICSError(_('Recurrent events are not handled.'))
try:
start_dt = vevent.dtstart.value
if not isinstance(start_dt, datetime.datetime):
start_dt = datetime.datetime.combine(start_dt,
datetime.datetime.min.time())
event['start_datetime'] = start_dt
except AttributeError:
raise ICSError(_('Event "%s" has no start date.') % summary)
try:
end_dt = vevent.dtend.value
if not isinstance(end_dt, datetime.datetime):
end_dt = datetime.datetime.combine(end_dt,
datetime.datetime.min.time())
except AttributeError:
# events without end date are considered as ending the same day
end_dt = datetime.datetime.combine(start_dt, datetime.datetime.max.time())
event['end_datetime'] = end_dt
obj, created = TimePeriodException.objects.get_or_create(desk=self, label=summary,
**event)
if created:
total_created += 1
return total_created
class TimePeriodException(models.Model):
desk = models.ForeignKey(Desk)
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
start_datetime = models.DateTimeField(_('Exception start time'))
end_datetime = models.DateTimeField(_('Exception end time'))
class Meta:
ordering = ['start_datetime']
def __unicode__(self):
date_format = get_format('SHORT_DATETIME_FORMAT')
if is_midnight(self.start_datetime) or is_midnight(self.end_datetime):
date_format = get_format('SHORT_DATE_FORMAT')
exc_repr = u'%s%s' % (DateFormat(localtime(self.start_datetime)).format(date_format),
DateFormat(localtime(self.end_datetime)).format(date_format))
if self.label:
exc_repr = u'%s (%s)' % (self.label, exc_repr)
return exc_repr
def clean(self):
super(TimePeriodException, self).clean()
if self.has_booking_within_time_slot():
raise ValidationError(_('One or several bookings exists within this time slot.'))
def has_booking_within_time_slot(self):
for event in Event.objects.filter(agenda=self.desk.agenda, booking__isnull=False):
if self.start_datetime <= event.start_datetime < self.end_datetime:
return True
return False
@classmethod
def import_json(cls, data):
data['start_datetime'] = make_aware(datetime.datetime.strptime(
data['start_datetime'], '%Y-%m-%d %H:%M:%S'))
data['end_datetime'] = make_aware(datetime.datetime.strptime(
data['end_datetime'], '%Y-%m-%d %H:%M:%S'))
return cls(**data)
def export_json(self):
return {
'label': self.label,
'start_datetime': self.start_datetime.strftime('%Y-%m-%d %H:%M:%S'),
'end_datetime': self.end_datetime.strftime('%Y-%m-%d %H:%M:%S'),
}