combo/combo/apps/calendar/utils.py

242 lines
7.8 KiB
Python

# combo - content management system
# Copyright (C) 2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import math
import urllib
from django.conf import settings
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
from django.utils.dateparse import parse_datetime
from django.utils.timezone import localtime, make_aware
from django.utils.translation import ugettext_lazy as _
from combo.utils import requests
def get_services(service_name):
if hasattr(settings, 'KNOWN_SERVICES') and settings.KNOWN_SERVICES.get(service_name):
return settings.KNOWN_SERVICES[service_name]
return {}
def get_wcs_services():
return get_services('wcs')
def get_chrono_service():
for chrono_key, chrono_site in get_services('chrono').iteritems():
if not chrono_site.get('secondary', True):
chrono_site['slug'] = chrono_key
return chrono_site
return {}
def is_chrono_enabled():
return bool(get_chrono_service())
def is_wcs_enabled():
return bool(get_wcs_services())
def get_agendas():
chrono = get_chrono_service()
references = []
response = requests.get('api/agenda/', remote_service=chrono, without_user=True)
try:
result = response.json()
except ValueError:
return references
for agenda in result.get('data'):
references.append((
'%s:%s' % (chrono['slug'], agenda['id']), agenda['text']))
return references
def get_chrono_events(agenda_reference, synchronous):
chrono_key, chrono_slug = agenda_reference.split(':')
chrono = get_chrono_service()
response = requests.get('api/agenda/%s/datetimes/' % chrono_slug, remote_service=chrono,
without_user=True, raise_if_not_cached=synchronous)
try:
if response.status_code != 200:
raise ValueError
result = response.json()
except ValueError:
return {'error': _('An error occurred while retrieving calendar\'s availabilities.')}
return result
def get_calendar_context_vars(request, cell, events_data):
page = request.GET.get('chunk_%s' % cell.pk, 1)
if 'error' in events_data:
return events_data
events = events_data['data']
calendar = get_calendar(events, cell.slot_duration, cell.days_displayed,
cell.minimal_booking_duration)
paginator = Paginator(calendar.get_computed_days(), cell.days_displayed)
try:
cal_page = paginator.page(page)
except PageNotAnInteger:
cal_page = paginator.page(1)
except (EmptyPage,):
cal_page = paginator.page(paginator.num_pages)
return {
'calendar': calendar,
'calendar_days': cal_page,
'calendar_slots': calendar.get_slots()
}
def get_calendar(events, offset, days_displayed, min_duration):
calendar = Calendar(offset, days_displayed, min_duration)
for event in events:
event_datetime = parse_datetime(event['datetime'])
if not calendar.has_day(event_datetime.date()):
day = WeekDay(event_datetime.date())
calendar.days.append(day)
else:
day = calendar.get_day(event_datetime.date())
# add slots to day
day.add_slots(DaySlot(
event_datetime, True if not event.get('disabled', True) else False))
return calendar
def get_form_url_with_params(cell, data):
session_vars = {
"session_var_booking_agenda_slug": cell.agenda_reference.split(':')[1],
"session_var_booking_start": data['start'].isoformat(),
"session_var_booking_end": data['end'].isoformat()
}
wcs_key, wcs_slug = cell.formdef_reference.split(':')
wcs = get_wcs_services().get(wcs_key)
url = '%s%s/?%s' % (wcs['url'], wcs_slug, urllib.urlencode(session_vars))
return url
class DaySlot(object):
def __init__(self, date_time, available, exist=True):
self.date_time = localtime(make_aware(date_time))
self.available = available
self.exist = exist
def __repr__(self):
return '<DaySlot date_time=%s - available=%s>' % (self.date_time.isoformat(), self.available)
@property
def label(self):
return '%s' % self.date_time.isoformat()
class WeekDay(object):
def __init__(self, date):
self.date = date
self.slots = []
def __repr__(self):
return '<WeekDay %s >' % self.date.isoformat()
def add_slots(self, slot):
if slot not in self.slots:
self.slots.append(slot)
def get_slot(self, slot_time):
for slot in self.slots:
if slot.date_time.time() == slot_time:
return slot
slot_datetime = datetime.datetime.combine(self.date, slot_time)
return DaySlot(slot_datetime, False, exist=False)
def get_minimum_slot(self):
return min(self.slots, key=lambda x: x.date_time.time())
def get_maximum_slot(self):
return max(self.slots, key=lambda x: x.date_time.time())
class Calendar(object):
def __init__(self, offset, days_displayed, min_duration):
self.offset = offset
self.days_displayed = days_displayed
self.days = []
self.min_duration = min_duration
def __repr__(self):
return '<Calendar>'
def get_first_available_slot(self):
"""return the first available slot that has enough
consecutive available slots to be allowed for booking
"""
required_contiguous_slots = self.min_duration.seconds / self.offset.seconds
for day in self.days:
slots = day.slots
for idx in range(len(slots) - required_contiguous_slots):
if all([x.available for x in slots[idx:idx+required_contiguous_slots]]):
return slots[idx]
return None
def get_slots(self):
start = self.get_minimum_slot()
end = self.get_maximum_slot()
while start <= end:
yield start
start = datetime.datetime.combine(
datetime.date.today(), start) + self.offset
start = start.time()
def get_computed_days(self):
if not self.days:
return []
computed_days = []
base_day = self.days[0].date
days_diff = (self.days[-1].date - self.days[0].date).days
# find a number which ensures calendar days are equally chunked
days_range = int(self.days_displayed * math.ceil(float(days_diff + 1) / self.days_displayed))
for index in range(days_range):
day = base_day + datetime.timedelta(days=index)
computed_days.append(day)
return computed_days
def get_day(self, date):
for day in self.days:
if day.date == date:
return day
return None
def has_day(self, date):
return bool(self.get_day(date))
def get_availability(self, slot):
if not self.has_day(slot.date()):
return DaySlot(slot, False, exist=False)
day = self.get_day(slot.date())
return day.get_slot(slot.time())
def get_minimum_slot(self):
return min([day.get_minimum_slot().date_time.time() for day in self.days])
def get_maximum_slot(self):
return max([day.get_maximum_slot().date_time.time() for day in self.days])