501 lines
16 KiB
Python
501 lines
16 KiB
Python
import pytest
|
|
import datetime
|
|
import mock
|
|
import requests
|
|
|
|
|
|
from django.contrib.auth.models import Group
|
|
from django.core.files.base import ContentFile
|
|
from django.core.management import call_command
|
|
from django.utils.timezone import localtime, make_aware, now
|
|
|
|
from chrono.agendas.models import (
|
|
Agenda,
|
|
Booking,
|
|
Desk,
|
|
Event,
|
|
ICSError,
|
|
MeetingType,
|
|
TimePeriodException,
|
|
TimePeriodExceptionSource,
|
|
)
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
ICS_SAMPLE = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T082855Z
|
|
DTSTART:20170831T170800Z
|
|
DTEND:20170831T203400Z
|
|
SEQUENCE:1
|
|
SUMMARY:Événement 1
|
|
END:VEVENT
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T092855Z
|
|
DTSTART:20170830T180800Z
|
|
DTEND:20170831T223400Z
|
|
SEQUENCE:2
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_DURATION = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T082855Z
|
|
DTSTART:20170831T170800Z
|
|
DURATION:PT3H26M
|
|
SEQUENCE:1
|
|
SUMMARY:Event 1
|
|
END:VEVENT
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T092855Z
|
|
DTSTART:20170830T180800Z
|
|
DURATION:P1D4H26M
|
|
SEQUENCE:2
|
|
SUMMARY:Event 2
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_RECURRENT_EVENT = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170720T145803Z
|
|
DESCRIPTION:Vacances d'ete
|
|
DTSTART;VALUE=DATE:20180101
|
|
DTEND;VALUE=DATE:20180101
|
|
SUMMARY:reccurent event
|
|
END:VEVENT
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20180824T082855Z
|
|
DTSTART:20180101
|
|
DTEND:20180101
|
|
SUMMARY:New Year's Eve
|
|
RRULE:FREQ=YEARLY
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20180824T082855Z
|
|
DTSTART:20120101
|
|
DURATION:PT24H
|
|
SUMMARY:New Year's Eve
|
|
RRULE:FREQ=YEARLY
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_NO_EVENTS = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
END:VCALENDAR"""
|
|
|
|
INVALID_ICS_SAMPLE = """content
|
|
"""
|
|
|
|
|
|
with open('tests/data/atreal.ics') as f:
|
|
ICS_ATREAL = f.read()
|
|
|
|
|
|
def test_slug():
|
|
agenda = Agenda(label=u'Foo bar')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-bar'
|
|
|
|
|
|
def test_existing_slug():
|
|
agenda = Agenda(label=u'Foo bar', slug='bar')
|
|
agenda.save()
|
|
assert agenda.slug == 'bar'
|
|
|
|
|
|
def test_duplicate_slugs():
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-baz'
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-baz-1'
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-baz-2'
|
|
|
|
|
|
def test_event_manager():
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
event = Event(start_datetime=now(), places=10, agenda=agenda)
|
|
event.save()
|
|
booking = Booking(event=event)
|
|
booking.save()
|
|
assert Event.objects.all()[0].booked_places == 1
|
|
booking.cancellation_datetime = now()
|
|
booking.save()
|
|
assert Event.objects.all()[0].booked_places == 0
|
|
|
|
|
|
def test_event_bookable_period():
|
|
agenda = Agenda(label=u'Foo bar')
|
|
agenda.save()
|
|
event = Event(start_datetime=now() + datetime.timedelta(days=10), places=10, agenda=agenda)
|
|
event.save()
|
|
assert event.in_bookable_period() is True
|
|
agenda.minimal_booking_delay = 20
|
|
assert event.in_bookable_period() is False
|
|
agenda.minimal_booking_delay = 1
|
|
agenda.maximal_booking_delay = 5
|
|
assert event.in_bookable_period() is False
|
|
agenda.maximal_booking_delay = 20
|
|
assert event.in_bookable_period() is True
|
|
|
|
# special case for events that happens today
|
|
agenda.minimal_booking_delay = 0
|
|
agenda.save()
|
|
event = Event(start_datetime=now() + datetime.timedelta(minutes=10), places=10, agenda=agenda)
|
|
event.save()
|
|
assert event.in_bookable_period() is True
|
|
|
|
event = Event(start_datetime=now() - datetime.timedelta(minutes=10), places=10, agenda=agenda)
|
|
event.save()
|
|
assert event.in_bookable_period() is False
|
|
|
|
|
|
def test_meeting_type_slugs():
|
|
agenda1 = Agenda(label=u'Foo bar')
|
|
agenda1.save()
|
|
agenda2 = Agenda(label=u'Foo bar second')
|
|
agenda2.save()
|
|
|
|
meeting_type1 = MeetingType(agenda=agenda1, label=u'Baz')
|
|
meeting_type1.save()
|
|
assert meeting_type1.slug == 'baz'
|
|
|
|
meeting_type2 = MeetingType(agenda=agenda1, label=u'Baz')
|
|
meeting_type2.save()
|
|
assert meeting_type2.slug == 'baz-1'
|
|
|
|
meeting_type3 = MeetingType(agenda=agenda2, label=u'Baz')
|
|
meeting_type3.save()
|
|
assert meeting_type3.slug == 'baz'
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics():
|
|
agenda = Agenda(label=u'Test 1 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 1 desk', agenda=agenda)
|
|
desk.save()
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE, name='sample.ics')
|
|
)
|
|
assert exceptions_count == 2
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics_without_startdt():
|
|
agenda = Agenda(label=u'Test 2 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 2 desk', agenda=agenda)
|
|
desk.save()
|
|
lines = []
|
|
# remove start datetimes from ics
|
|
for line in ICS_SAMPLE.splitlines():
|
|
if line.startswith('DTSTART:'):
|
|
continue
|
|
lines.append(line)
|
|
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
assert 'Event "Événement 1" has no start date.' == str(e.value)
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics_without_enddt():
|
|
agenda = Agenda(label=u'Test 3 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 3 desk', agenda=agenda)
|
|
desk.save()
|
|
lines = []
|
|
# remove end datetimes from ics
|
|
for line in ICS_SAMPLE.splitlines():
|
|
if line.startswith('DTEND:'):
|
|
continue
|
|
lines.append(line)
|
|
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
|
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
for exception in TimePeriodException.objects.filter(desk=desk):
|
|
end_time = localtime(exception.end_datetime).time()
|
|
assert end_time == datetime.time(23, 59, 59, 999999)
|
|
|
|
|
|
@pytest.mark.freeze_time('2017-12-01')
|
|
def test_timeperiodexception_creation_from_ics_with_recurrences():
|
|
agenda = Agenda(label=u'Test 4 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 4 desk', agenda=agenda)
|
|
desk.save()
|
|
assert (
|
|
desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')
|
|
)
|
|
== 3
|
|
)
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 3
|
|
|
|
|
|
def test_timeexception_creation_from_ics_with_dates():
|
|
agenda = Agenda(label=u'Test 5 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 5 desk', agenda=agenda)
|
|
desk.save()
|
|
lines = []
|
|
# remove end datetimes from ics
|
|
for line in ICS_SAMPLE_WITH_RECURRENT_EVENT.splitlines():
|
|
if line.startswith('RRULE:'):
|
|
continue
|
|
lines.append(line)
|
|
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
assert exceptions_count == 2
|
|
for exception in TimePeriodException.objects.filter(desk=desk):
|
|
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
|
|
assert localtime(exception.end_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
|
|
|
|
|
|
def test_timeexception_create_from_invalid_ics():
|
|
agenda = Agenda(label=u'Test 6 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 6 desk', agenda=agenda)
|
|
desk.save()
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_ics_file(ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
|
|
assert str(e.value) == 'File format is invalid.'
|
|
|
|
|
|
def test_timeexception_create_from_ics_with_no_events():
|
|
agenda = Agenda(label=u'Test 7 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 7 desk', agenda=agenda)
|
|
desk.save()
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')
|
|
)
|
|
assert str(e.value) == "The file doesn't contain any events."
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_creation_from_remote_ics(mocked_get):
|
|
agenda = Agenda(label=u'Test 8 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 8 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.text = ICS_SAMPLE
|
|
mocked_get.return_value = mocked_response
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert exceptions_count == 2
|
|
assert 'Événement 1' in [x.label for x in desk.timeperiodexception_set.all()]
|
|
|
|
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
|
|
mocked_get.return_value = mocked_response
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert str(e.value) == "The file doesn't contain any events."
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_remote_ics_encoding(mocked_get):
|
|
agenda = Agenda(label=u'Test 8 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 8 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.content = ICS_SAMPLE.encode('iso-8859-15')
|
|
mocked_response.text = ICS_SAMPLE
|
|
mocked_get.return_value = mocked_response
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert exceptions_count == 2
|
|
assert 'Événement 1' in [x.label for x in desk.timeperiodexception_set.all()]
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
|
|
agenda = Agenda(label=u'Test 9 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 9 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.text = ICS_SAMPLE
|
|
mocked_get.return_value = mocked_response
|
|
|
|
def mocked_requests_connection_error(*args, **kwargs):
|
|
raise requests.ConnectionError('unreachable')
|
|
|
|
mocked_get.side_effect = mocked_requests_connection_error
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
|
|
agenda = Agenda(label=u'Test 10 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 10 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.status_code = 403
|
|
mocked_get.return_value = mocked_response
|
|
|
|
def mocked_requests_http_forbidden_error(*args, **kwargs):
|
|
raise requests.HTTPError(response=mocked_response)
|
|
|
|
mocked_get.side_effect = mocked_requests_http_forbidden_error
|
|
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert (
|
|
str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
|
|
)
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
|
|
agenda = Agenda(label=u'Test 11 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 11 desk', agenda=agenda)
|
|
desk.save()
|
|
TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
|
|
mocked_response = mock.Mock()
|
|
mocked_response.status_code = 403
|
|
mocked_get.return_value = mocked_response
|
|
|
|
def mocked_requests_http_forbidden_error(*args, **kwargs):
|
|
raise requests.HTTPError(response=mocked_response)
|
|
|
|
mocked_get.side_effect = mocked_requests_http_forbidden_error
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
out, err = capsys.readouterr()
|
|
assert (
|
|
err
|
|
== 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403).\n'
|
|
)
|
|
|
|
|
|
def test_base_meeting_duration():
|
|
agenda = Agenda(label='Meeting', kind='meetings')
|
|
agenda.save()
|
|
|
|
with pytest.raises(ValueError):
|
|
agenda.get_base_meeting_duration()
|
|
|
|
meeting_type = MeetingType(agenda=agenda, label='Foo', duration=30)
|
|
meeting_type.save()
|
|
assert agenda.get_base_meeting_duration() == 30
|
|
|
|
meeting_type = MeetingType(agenda=agenda, label='Bar', duration=60)
|
|
meeting_type.save()
|
|
assert agenda.get_base_meeting_duration() == 30
|
|
|
|
meeting_type = MeetingType(agenda=agenda, label='Bar', duration=45)
|
|
meeting_type.save()
|
|
assert agenda.get_base_meeting_duration() == 15
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics_with_duration():
|
|
# test that event defined using duration works and give the same start and
|
|
# end dates
|
|
agenda = Agenda(label=u'Test 1 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 1 desk', agenda=agenda)
|
|
desk.save()
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
|
|
)
|
|
assert exceptions_count == 2
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
|
[
|
|
make_aware(datetime.datetime(2017, 8, 31, 19, 8, 0)),
|
|
make_aware(datetime.datetime(2017, 8, 30, 20, 8, 0)),
|
|
]
|
|
)
|
|
assert set(TimePeriodException.objects.values_list('end_datetime', flat=True)) == set(
|
|
[
|
|
make_aware(datetime.datetime(2017, 8, 31, 22, 34, 0)),
|
|
make_aware(datetime.datetime(2017, 9, 1, 0, 34, 0)),
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.mark.freeze_time('2017-12-01')
|
|
def test_timeperiodexception_creation_from_ics_with_recurrences_in_the_past():
|
|
# test that recurrent events before today are not created
|
|
# also test that duration + recurrent events works
|
|
agenda = Agenda(label=u'Test 4 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 4 desk', agenda=agenda)
|
|
desk.save()
|
|
assert (
|
|
desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')
|
|
)
|
|
== 2
|
|
)
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
|
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
|
|
)
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
|
|
agenda = Agenda(label=u'Test atreal agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test atreal desk', agenda=agenda)
|
|
desk.save()
|
|
assert desk.import_timeperiod_exceptions_from_ics_file(ContentFile(ICS_ATREAL, name='sample.ics'))
|
|
|
|
|
|
def test_management_role_deletion():
|
|
group = Group(name=u'Group')
|
|
group.save()
|
|
agenda = Agenda(label=u'Test agenda', edit_role=group, view_role=group)
|
|
agenda.save()
|
|
|
|
Group.objects.all().delete()
|
|
|
|
Agenda.objects.get(id=agenda.id).view_role is None
|
|
Agenda.objects.get(id=agenda.id).edit_role is None
|
|
|
|
|
|
def test_event_bookings_annotation():
|
|
agenda = Agenda(label='test', kind='events')
|
|
agenda.save()
|
|
event = Event(start_datetime=now(), label='foo', places=10, waiting_list_places=10, agenda=agenda)
|
|
event.save()
|
|
event2 = Event(start_datetime=now(), label='bar', places=10, waiting_list_places=10, agenda=agenda)
|
|
event2.save()
|
|
|
|
Booking(event=event).save()
|
|
Booking(event=event).save()
|
|
Booking(event=event, cancellation_datetime=now()).save()
|
|
Booking(event=event, in_waiting_list=True).save()
|
|
Booking(event=event, in_waiting_list=True, cancellation_datetime=now()).save()
|
|
|
|
Booking(event=event2).save()
|
|
Booking(event=event2).save()
|
|
Booking(event=event2).save()
|
|
|
|
for event in Event.annotate_queryset(Event.objects.filter(agenda=agenda)):
|
|
if event.label == 'foo':
|
|
assert event.booked_places_count == 2
|
|
assert event.waiting_list_count == 1
|
|
elif event.label == 'bar':
|
|
assert event.booked_places_count == 3
|
|
assert event.waiting_list_count == 0
|