352 lines
13 KiB
Python
352 lines
13 KiB
Python
import pytest
|
|
import datetime
|
|
import mock
|
|
import re
|
|
import requests
|
|
|
|
|
|
from django.utils.timezone import now, make_aware, localtime
|
|
from django.core.management import call_command
|
|
from django.core.management.base import CommandError
|
|
|
|
from chrono.agendas.models import (Agenda, Event, Booking, MeetingType,
|
|
Desk, TimePeriod, TimePeriodException, ICSError)
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
ICS_SAMPLE = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T082855Z
|
|
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
|
|
DTSTART:20170831T170800Z
|
|
DTEND:20170831T203400Z
|
|
SEQUENCE:1
|
|
SUMMARY:Event 1
|
|
END:VEVENT
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T092855Z
|
|
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
|
|
DTSTART:20170830T180800Z
|
|
DTEND:20170831T223400Z
|
|
SEQUENCE:2
|
|
SUMMARY:Event 2
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_RECURRENT_EVENT = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170720T145803Z
|
|
DESCRIPTION:Vacances d'ete
|
|
DTSTART;VALUE=DATE:20180101
|
|
DTEND;VALUE=DATE:20180101
|
|
SUMMARY:reccurent event
|
|
END:VEVENT
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T082855Z
|
|
DTSTART:20180101
|
|
DTEND:20180101
|
|
SUMMARY:New Year's Eve
|
|
RRULE:FREQ=YEARLY
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_NO_EVENTS = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
END:VCALENDAR"""
|
|
|
|
INVALID_ICS_SAMPLE = """content
|
|
"""
|
|
|
|
|
|
def test_slug():
|
|
agenda = Agenda(label=u'Foo bar')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-bar'
|
|
|
|
def test_existing_slug():
|
|
agenda = Agenda(label=u'Foo bar', slug='bar')
|
|
agenda.save()
|
|
assert agenda.slug == 'bar'
|
|
|
|
def test_duplicate_slugs():
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-baz'
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-baz-1'
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-baz-2'
|
|
|
|
def test_event_manager():
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
event = Event(start_datetime=now(), places=10, agenda=agenda)
|
|
event.save()
|
|
booking = Booking(event=event)
|
|
booking.save()
|
|
assert Event.objects.all()[0].booked_places == 1
|
|
booking.cancellation_datetime = now()
|
|
booking.save()
|
|
assert Event.objects.all()[0].booked_places == 0
|
|
|
|
def test_event_bookable_period():
|
|
agenda = Agenda(label=u'Foo bar')
|
|
agenda.save()
|
|
event = Event(start_datetime=now() + datetime.timedelta(days=10), places=10, agenda=agenda)
|
|
event.save()
|
|
assert event.in_bookable_period() is True
|
|
agenda.minimal_booking_delay = 20
|
|
assert event.in_bookable_period() is False
|
|
agenda.minimal_booking_delay = 1
|
|
agenda.maximal_booking_delay = 5
|
|
assert event.in_bookable_period() is False
|
|
agenda.maximal_booking_delay = 20
|
|
assert event.in_bookable_period() is True
|
|
|
|
# special case for events that happens today
|
|
agenda.minimal_booking_delay = 0
|
|
agenda.save()
|
|
event = Event(start_datetime=now() + datetime.timedelta(minutes=10), places=10, agenda=agenda)
|
|
event.save()
|
|
assert event.in_bookable_period() is True
|
|
|
|
event = Event(start_datetime=now() - datetime.timedelta(minutes=10), places=10, agenda=agenda)
|
|
event.save()
|
|
assert event.in_bookable_period() is False
|
|
|
|
def test_meeting_type_slugs():
|
|
agenda1 = Agenda(label=u'Foo bar')
|
|
agenda1.save()
|
|
agenda2 = Agenda(label=u'Foo bar second')
|
|
agenda2.save()
|
|
|
|
meeting_type1 = MeetingType(agenda=agenda1, label=u'Baz')
|
|
meeting_type1.save()
|
|
assert meeting_type1.slug == 'baz'
|
|
|
|
meeting_type2 = MeetingType(agenda=agenda1, label=u'Baz')
|
|
meeting_type2.save()
|
|
assert meeting_type2.slug == 'baz-1'
|
|
|
|
meeting_type3 = MeetingType(agenda=agenda2, label=u'Baz')
|
|
meeting_type3.save()
|
|
assert meeting_type3.slug == 'baz'
|
|
|
|
def test_timeperiodexception_creation_from_ics():
|
|
agenda = Agenda(label=u'Test 1 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 1 desk', agenda=agenda)
|
|
desk.save()
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE)
|
|
assert exceptions_count == 2
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
|
|
def test_timeperiodexception_creation_from_ics_without_startdt():
|
|
agenda = Agenda(label=u'Test 2 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 2 desk', agenda=agenda)
|
|
desk.save()
|
|
lines = []
|
|
# remove start datetimes from ics
|
|
for line in ICS_SAMPLE.splitlines():
|
|
if line.startswith('DTSTART:'):
|
|
continue
|
|
lines.append(line)
|
|
ics_sample = "\n".join(lines)
|
|
with pytest.raises(ICSError) as e:
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
|
assert 'Event "Event 1" has no start date.' == str(e.value)
|
|
|
|
def test_timeperiodexception_creation_from_ics_without_enddt():
|
|
agenda = Agenda(label=u'Test 3 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 3 desk', agenda=agenda)
|
|
desk.save()
|
|
lines = []
|
|
# remove end datetimes from ics
|
|
for line in ICS_SAMPLE.splitlines():
|
|
if line.startswith('DTEND:'):
|
|
continue
|
|
lines.append(line)
|
|
ics_sample = "\n".join(lines)
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
|
for exception in TimePeriodException.objects.filter(desk=desk):
|
|
end_time = localtime(exception.end_datetime).time()
|
|
assert end_time == datetime.time(23, 59, 59, 999999)
|
|
|
|
def test_timeperiodexception_creation_from_ics_with_recurrences():
|
|
agenda = Agenda(label=u'Test 4 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 4 desk', agenda=agenda)
|
|
desk.save()
|
|
with pytest.raises(ICSError) as e:
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT)
|
|
assert 'Recurrent events are not handled.' == str(e.value)
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 0
|
|
|
|
def test_timeexception_creation_from_ics_with_dates():
|
|
agenda = Agenda(label=u'Test 5 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 5 desk', agenda=agenda)
|
|
desk.save()
|
|
lines = []
|
|
# remove end datetimes from ics
|
|
for line in ICS_SAMPLE_WITH_RECURRENT_EVENT.splitlines():
|
|
if line.startswith('RRULE:'):
|
|
continue
|
|
lines.append(line)
|
|
ics_sample = "\n".join(lines)
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
|
assert exceptions_count == 2
|
|
for exception in TimePeriodException.objects.filter(desk=desk):
|
|
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 01, 01, 00, 00))
|
|
assert localtime(exception.end_datetime) == make_aware(datetime.datetime(2018, 01, 01, 00, 00))
|
|
|
|
def test_timeexception_create_from_invalid_ics():
|
|
agenda = Agenda(label=u'Test 6 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 6 desk', agenda=agenda)
|
|
desk.save()
|
|
with pytest.raises(ICSError) as e:
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_ics(INVALID_ICS_SAMPLE)
|
|
assert str(e.value) == 'File format is invalid.'
|
|
|
|
def test_timeexception_create_from_ics_with_no_events():
|
|
agenda = Agenda(label=u'Test 7 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 7 desk', agenda=agenda)
|
|
desk.save()
|
|
with pytest.raises(ICSError) as e:
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
|
|
assert str(e.value) == "The file doesn't contain any events."
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_creation_from_remote_ics(mocked_get):
|
|
agenda = Agenda(label=u'Test 8 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 8 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.text = ICS_SAMPLE
|
|
mocked_get.return_value = mocked_response
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert exceptions_count == 2
|
|
mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE)
|
|
mocked_get.return_value = mocked_response
|
|
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk):
|
|
assert 'New summary ' in timeperiod.label
|
|
|
|
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
|
|
mocked_get.return_value = mocked_response
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert exceptions_count == 0
|
|
TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
|
|
agenda = Agenda(label=u'Test 9 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 9 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.text = ICS_SAMPLE
|
|
mocked_get.return_value = mocked_response
|
|
def mocked_requests_connection_error(*args, **kwargs):
|
|
raise requests.ConnectionError('unreachable')
|
|
mocked_get.side_effect = mocked_requests_connection_error
|
|
with pytest.raises(ICSError) as e:
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert str(e.value) == "Failed to retrieve remote calendar (unreachable)."
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
|
|
agenda = Agenda(label=u'Test 10 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 10 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.status_code = 403
|
|
mocked_get.return_value = mocked_response
|
|
def mocked_requests_http_forbidden_error(*args, **kwargs):
|
|
raise requests.HTTPError(response=mocked_response)
|
|
mocked_get.side_effect = mocked_requests_http_forbidden_error
|
|
|
|
with pytest.raises(ICSError) as e:
|
|
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert str(e.value) == "Failed to retrieve remote calendar (HTTP error 403)."
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
|
|
agenda = Agenda(label=u'Test 11 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.status_code = 403
|
|
mocked_get.return_value = mocked_response
|
|
def mocked_requests_http_forbidden_error(*args, **kwargs):
|
|
raise requests.HTTPError(response=mocked_response)
|
|
mocked_get.side_effect = mocked_requests_http_forbidden_error
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
out, err = capsys.readouterr()
|
|
assert err == 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (HTTP error 403).\n'
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
|
|
agenda = Agenda(label=u'Test 11 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.text = ICS_SAMPLE
|
|
mocked_get.return_value = mocked_response
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
mocked_response.text = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20180824T082855Z
|
|
UID:new-and-unique-uid
|
|
DTSTART:20180831T170800Z
|
|
DTEND:20180831T203400Z
|
|
SUMMARY:Wonderfull event
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
mocked_get.return_value = mocked_response
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
|
exception = TimePeriodException.objects.get(desk=desk)
|
|
assert exception.external_id == 'new-and-unique-uid'
|
|
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
|
|
mocked_get.return_value = mocked_response
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
assert not TimePeriodException.objects.filter(desk=desk).exists()
|
|
|
|
def test_base_meeting_duration():
|
|
agenda = Agenda(label='Meeting', kind='meetings')
|
|
agenda.save()
|
|
|
|
with pytest.raises(ValueError):
|
|
agenda.get_base_meeting_duration()
|
|
|
|
meeting_type = MeetingType(agenda=agenda, label='Foo', duration=30)
|
|
meeting_type.save()
|
|
assert agenda.get_base_meeting_duration() == 30
|
|
|
|
meeting_type = MeetingType(agenda=agenda, label='Bar', duration=60)
|
|
meeting_type.save()
|
|
assert agenda.get_base_meeting_duration() == 30
|
|
|
|
meeting_type = MeetingType(agenda=agenda, label='Bar', duration=45)
|
|
meeting_type.save()
|
|
assert agenda.get_base_meeting_duration() == 15
|