1148 lines
43 KiB
Python
1148 lines
43 KiB
Python
import pytest
|
|
import datetime
|
|
import mock
|
|
import requests
|
|
|
|
|
|
from django.contrib.auth.models import Group
|
|
from django.core.files.base import ContentFile
|
|
from django.core.management import call_command
|
|
from django.test import override_settings
|
|
from django.utils.timezone import localtime, make_aware, now
|
|
|
|
from chrono.agendas.models import (
|
|
Agenda,
|
|
Booking,
|
|
Category,
|
|
Desk,
|
|
Event,
|
|
ICSError,
|
|
MeetingType,
|
|
Resource,
|
|
TimePeriod,
|
|
TimePeriodException,
|
|
TimePeriodExceptionSource,
|
|
VirtualMember,
|
|
EventCancellationReport,
|
|
)
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
ICS_SAMPLE = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T082855Z
|
|
DTSTART:20170831T170800Z
|
|
DTEND:20170831T203400Z
|
|
SEQUENCE:1
|
|
SUMMARY:Événement 1
|
|
END:VEVENT
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T092855Z
|
|
DTSTART:20170830T180800Z
|
|
DTEND:20170831T223400Z
|
|
SEQUENCE:2
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_DURATION = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T082855Z
|
|
DTSTART:20170831T170800Z
|
|
DURATION:PT3H26M
|
|
SEQUENCE:1
|
|
SUMMARY:Event 1
|
|
END:VEVENT
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170824T092855Z
|
|
DTSTART:20170830T180800Z
|
|
DURATION:P1D4H26M
|
|
SEQUENCE:2
|
|
SUMMARY:Event 2
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_RECURRENT_EVENT = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20170720T145803Z
|
|
DESCRIPTION:Vacances d'ete
|
|
DTSTART;VALUE=DATE:20180101
|
|
DTEND;VALUE=DATE:20180101
|
|
SUMMARY:reccurent event
|
|
END:VEVENT
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20180824T082855Z
|
|
DTSTART:20180101
|
|
DTEND:20180101
|
|
SUMMARY:New Year's Eve
|
|
RRULE:FREQ=YEARLY
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
BEGIN:VEVENT
|
|
DTSTAMP:20180824T082855Z
|
|
DTSTART:20120101
|
|
DURATION:PT24H
|
|
SUMMARY:New Year's Eve
|
|
RRULE:FREQ=YEARLY
|
|
END:VEVENT
|
|
END:VCALENDAR"""
|
|
|
|
ICS_SAMPLE_WITH_NO_EVENTS = """BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
PRODID:-//foo.bar//EN
|
|
END:VCALENDAR"""
|
|
|
|
INVALID_ICS_SAMPLE = """content
|
|
"""
|
|
|
|
|
|
with open('tests/data/atreal.ics') as f:
|
|
ICS_ATREAL = f.read()
|
|
|
|
|
|
def test_slug():
|
|
agenda = Agenda(label=u'Foo bar')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-bar'
|
|
|
|
|
|
def test_existing_slug():
|
|
agenda = Agenda(label=u'Foo bar', slug='bar')
|
|
agenda.save()
|
|
assert agenda.slug == 'bar'
|
|
|
|
|
|
def test_duplicate_slugs():
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-baz'
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-baz-1'
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
assert agenda.slug == 'foo-baz-2'
|
|
|
|
|
|
def test_resource_slug():
|
|
resource = Resource.objects.create(label=u'Foo bar')
|
|
assert resource.slug == 'foo-bar'
|
|
|
|
|
|
def test_resource_existing_slug():
|
|
resource = Resource.objects.create(label=u'Foo bar', slug='bar')
|
|
assert resource.slug == 'bar'
|
|
|
|
|
|
def test_resource_duplicate_slugs():
|
|
resource = Resource.objects.create(label=u'Foo baz')
|
|
assert resource.slug == 'foo-baz'
|
|
resource = Resource.objects.create(label=u'Foo baz')
|
|
assert resource.slug == 'foo-baz-1'
|
|
resource = Resource.objects.create(label=u'Foo baz')
|
|
assert resource.slug == 'foo-baz-2'
|
|
|
|
|
|
def test_category_slug():
|
|
category = Category.objects.create(label=u'Foo bar')
|
|
assert category.slug == 'foo-bar'
|
|
|
|
|
|
def test_category_existing_slug():
|
|
category = Category.objects.create(label=u'Foo bar', slug='bar')
|
|
assert category.slug == 'bar'
|
|
|
|
|
|
def test_category_duplicate_slugs():
|
|
category = Category.objects.create(label=u'Foo baz')
|
|
assert category.slug == 'foo-baz'
|
|
category = Category.objects.create(label=u'Foo baz')
|
|
assert category.slug == 'foo-baz-1'
|
|
category = Category.objects.create(label=u'Foo baz')
|
|
assert category.slug == 'foo-baz-2'
|
|
|
|
|
|
def test_event_slug():
|
|
other_agenda = Agenda.objects.create(label='Foo bar')
|
|
Event.objects.create(agenda=other_agenda, places=42, start_datetime=now(), slug='foo-bar')
|
|
|
|
agenda = Agenda.objects.create(label='Foo baz')
|
|
event = Event.objects.create(agenda=agenda, places=42, start_datetime=now(), label='Foo bar')
|
|
assert event.slug == 'foo-bar'
|
|
event = Event.objects.create(agenda=agenda, places=42, start_datetime=now(), label='Foo bar')
|
|
assert event.slug == 'foo-bar-1'
|
|
event = Event.objects.create(agenda=agenda, places=42, start_datetime=now(), label='Foo bar')
|
|
assert event.slug == 'foo-bar-2'
|
|
|
|
event = Event.objects.create(agenda=agenda, places=42, start_datetime=now())
|
|
assert event.slug == 'foo-baz-event'
|
|
event = Event.objects.create(agenda=agenda, places=42, start_datetime=now())
|
|
assert event.slug == 'foo-baz-event-1'
|
|
event = Event.objects.create(agenda=agenda, places=42, start_datetime=now())
|
|
assert event.slug == 'foo-baz-event-2'
|
|
|
|
|
|
def test_event_existing_slug():
|
|
other_agenda = Agenda.objects.create(label='Foo bar')
|
|
Event.objects.create(agenda=other_agenda, places=42, start_datetime=now(), slug='bar')
|
|
|
|
agenda = Agenda.objects.create(label='Foo baz')
|
|
event = Event.objects.create(agenda=agenda, places=42, start_datetime=now(), label='Foo bar', slug='bar')
|
|
assert event.slug == 'bar'
|
|
|
|
|
|
def test_event_manager():
|
|
agenda = Agenda(label=u'Foo baz')
|
|
agenda.save()
|
|
event = Event(start_datetime=now(), places=10, agenda=agenda)
|
|
event.save()
|
|
booking = Booking(event=event)
|
|
booking.save()
|
|
assert Event.objects.all()[0].booked_places == 1
|
|
booking.cancellation_datetime = now()
|
|
booking.save()
|
|
assert Event.objects.all()[0].booked_places == 0
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'start_days, start_minutes, min_delay, max_delay, pub_days, expected',
|
|
[
|
|
# no delay
|
|
(10, 0, 0, 0, None, True),
|
|
# test publication_date
|
|
(10, 0, 0, 0, 1, False),
|
|
(10, 0, 0, 0, 0, True),
|
|
# test min and max delays
|
|
(10, 0, 20, 0, None, False),
|
|
(10, 0, 1, 5, None, False),
|
|
(10, 0, 1, 20, None, True),
|
|
# special case for events that happens today
|
|
(0, 10, 0, 20, None, True),
|
|
(0, -10, 0, 20, None, False),
|
|
],
|
|
)
|
|
def test_event_bookable_period(start_days, start_minutes, min_delay, max_delay, pub_days, expected):
|
|
agenda = Agenda.objects.create(
|
|
label=u'Foo bar', minimal_booking_delay=min_delay, maximal_booking_delay=max_delay
|
|
)
|
|
event = Event.objects.create(
|
|
start_datetime=now() + datetime.timedelta(days=start_days, minutes=start_minutes),
|
|
publication_date=(now().date() + datetime.timedelta(days=pub_days)) if pub_days else None,
|
|
places=10,
|
|
agenda=agenda,
|
|
)
|
|
assert event.in_bookable_period() == expected
|
|
|
|
|
|
def test_meeting_type_slugs():
|
|
agenda1 = Agenda(label=u'Foo bar')
|
|
agenda1.save()
|
|
agenda2 = Agenda(label=u'Foo bar second')
|
|
agenda2.save()
|
|
|
|
meeting_type1 = MeetingType(agenda=agenda1, label=u'Baz')
|
|
meeting_type1.save()
|
|
assert meeting_type1.slug == 'baz'
|
|
|
|
meeting_type2 = MeetingType(agenda=agenda1, label=u'Baz')
|
|
meeting_type2.save()
|
|
assert meeting_type2.slug == 'baz-1'
|
|
|
|
meeting_type3 = MeetingType(agenda=agenda2, label=u'Baz')
|
|
meeting_type3.save()
|
|
assert meeting_type3.slug == 'baz'
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics():
|
|
agenda = Agenda(label=u'Test 1 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 1 desk', agenda=agenda)
|
|
desk.save()
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE, name='sample.ics')
|
|
)
|
|
assert exceptions_count == 2
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics_without_startdt():
|
|
agenda = Agenda(label=u'Test 2 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 2 desk', agenda=agenda)
|
|
desk.save()
|
|
lines = []
|
|
# remove start datetimes from ics
|
|
for line in ICS_SAMPLE.splitlines():
|
|
if line.startswith('DTSTART:'):
|
|
continue
|
|
lines.append(line)
|
|
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
assert 'Event "Événement 1" has no start date.' == str(e.value)
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics_without_enddt():
|
|
agenda = Agenda(label=u'Test 3 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 3 desk', agenda=agenda)
|
|
desk.save()
|
|
lines = []
|
|
# remove end datetimes from ics
|
|
for line in ICS_SAMPLE.splitlines():
|
|
if line.startswith('DTEND:'):
|
|
continue
|
|
lines.append(line)
|
|
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
|
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
for exception in TimePeriodException.objects.filter(desk=desk):
|
|
end_time = localtime(exception.end_datetime).time()
|
|
assert end_time == datetime.time(23, 59, 59, 999999)
|
|
|
|
|
|
@pytest.mark.freeze_time('2017-12-01')
|
|
def test_timeperiodexception_creation_from_ics_with_recurrences():
|
|
agenda = Agenda(label=u'Test 4 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 4 desk', agenda=agenda)
|
|
desk.save()
|
|
assert (
|
|
desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')
|
|
)
|
|
== 3
|
|
)
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 3
|
|
|
|
|
|
def test_timeexception_creation_from_ics_with_dates():
|
|
agenda = Agenda(label=u'Test 5 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 5 desk', agenda=agenda)
|
|
desk.save()
|
|
lines = []
|
|
# remove end datetimes from ics
|
|
for line in ICS_SAMPLE_WITH_RECURRENT_EVENT.splitlines():
|
|
if line.startswith('RRULE:'):
|
|
continue
|
|
lines.append(line)
|
|
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
assert exceptions_count == 2
|
|
for exception in TimePeriodException.objects.filter(desk=desk):
|
|
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
|
|
assert localtime(exception.end_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
|
|
|
|
|
|
def test_timeexception_create_from_invalid_ics():
|
|
agenda = Agenda(label=u'Test 6 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 6 desk', agenda=agenda)
|
|
desk.save()
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_ics_file(ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
|
|
assert str(e.value) == 'File format is invalid.'
|
|
|
|
|
|
def test_timeexception_create_from_ics_with_no_events():
|
|
agenda = Agenda(label=u'Test 7 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 7 desk', agenda=agenda)
|
|
desk.save()
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')
|
|
)
|
|
assert str(e.value) == "The file doesn't contain any events."
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_creation_from_remote_ics(mocked_get):
|
|
agenda = Agenda(label=u'Test 8 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 8 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.text = ICS_SAMPLE
|
|
mocked_get.return_value = mocked_response
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert exceptions_count == 2
|
|
assert 'Événement 1' in [x.label for x in desk.timeperiodexception_set.all()]
|
|
|
|
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
|
|
mocked_get.return_value = mocked_response
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert str(e.value) == "The file doesn't contain any events."
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_remote_ics_encoding(mocked_get):
|
|
agenda = Agenda(label=u'Test 8 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 8 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.content = ICS_SAMPLE.encode('iso-8859-15')
|
|
mocked_response.text = ICS_SAMPLE
|
|
mocked_get.return_value = mocked_response
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert exceptions_count == 2
|
|
assert 'Événement 1' in [x.label for x in desk.timeperiodexception_set.all()]
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
|
|
agenda = Agenda(label=u'Test 9 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 9 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.text = ICS_SAMPLE
|
|
mocked_get.return_value = mocked_response
|
|
|
|
def mocked_requests_connection_error(*args, **kwargs):
|
|
raise requests.ConnectionError('unreachable')
|
|
|
|
mocked_get.side_effect = mocked_requests_connection_error
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
|
|
agenda = Agenda(label=u'Test 10 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 10 desk', agenda=agenda)
|
|
desk.save()
|
|
mocked_response = mock.Mock()
|
|
mocked_response.status_code = 403
|
|
mocked_get.return_value = mocked_response
|
|
|
|
def mocked_requests_http_forbidden_error(*args, **kwargs):
|
|
raise requests.HTTPError(response=mocked_response)
|
|
|
|
mocked_get.side_effect = mocked_requests_http_forbidden_error
|
|
|
|
with pytest.raises(ICSError) as e:
|
|
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
assert (
|
|
str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
|
|
)
|
|
|
|
|
|
@mock.patch('chrono.agendas.models.requests.get')
|
|
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
|
|
agenda = Agenda(label=u'Test 11 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 11 desk', agenda=agenda)
|
|
desk.save()
|
|
source = TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
|
|
mocked_response = mock.Mock()
|
|
mocked_response.status_code = 403
|
|
mocked_get.return_value = mocked_response
|
|
|
|
def mocked_requests_http_forbidden_error(*args, **kwargs):
|
|
raise requests.HTTPError(response=mocked_response)
|
|
|
|
mocked_get.side_effect = mocked_requests_http_forbidden_error
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
out, err = capsys.readouterr()
|
|
assert (
|
|
err
|
|
== 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403).\n'
|
|
)
|
|
|
|
assert source.ics_url is not None
|
|
assert source.ics_filename is None
|
|
assert source.ics_file.name is None
|
|
with mock.patch(
|
|
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
|
|
) as import_remote_ics:
|
|
with mock.patch(
|
|
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
|
|
) as import_file_ics:
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
assert import_remote_ics.call_args_list == [mock.call('http://example.com/sample.ics', source=source)]
|
|
assert import_file_ics.call_args_list == []
|
|
|
|
source.ics_url = None
|
|
source.ics_filename = 'sample.ics'
|
|
source.ics_file = ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
|
|
source.save()
|
|
with mock.patch(
|
|
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
|
|
) as import_remote_ics:
|
|
with mock.patch(
|
|
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
|
|
) as import_file_ics:
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
assert import_remote_ics.call_args_list == []
|
|
assert import_file_ics.call_args_list == [mock.call(mock.ANY, source=source)]
|
|
|
|
TimePeriodExceptionSource.objects.update(ics_file='')
|
|
with mock.patch(
|
|
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
|
|
) as import_remote_ics:
|
|
with mock.patch(
|
|
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
|
|
) as import_file_ics:
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
assert import_remote_ics.call_args_list == []
|
|
assert import_file_ics.call_args_list == []
|
|
|
|
TimePeriodExceptionSource.objects.update(ics_file=None)
|
|
with mock.patch(
|
|
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
|
|
) as import_remote_ics:
|
|
with mock.patch(
|
|
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
|
|
) as import_file_ics:
|
|
call_command('sync_desks_timeperiod_exceptions')
|
|
assert import_remote_ics.call_args_list == []
|
|
assert import_file_ics.call_args_list == []
|
|
|
|
|
|
@override_settings(
|
|
EXCEPTIONS_SOURCES={'holidays': {'class': 'workalendar.europe.France', 'label': 'Holidays'},}
|
|
)
|
|
def test_timeperiodexception_from_settings():
|
|
agenda = Agenda(label=u'Test 1 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 1 desk', agenda=agenda)
|
|
desk.save()
|
|
|
|
# first save automatically load exceptions
|
|
source = TimePeriodExceptionSource.objects.get(desk=desk)
|
|
assert source.settings_slug == 'holidays'
|
|
assert source.enabled
|
|
assert TimePeriodException.objects.filter(desk=desk, source=source).exists()
|
|
|
|
exception = TimePeriodException.objects.first()
|
|
from workalendar.europe import France
|
|
|
|
date, label = France().holidays()[0]
|
|
exception = TimePeriodException.objects.filter(label=label).first()
|
|
assert exception.end_datetime - exception.start_datetime == datetime.timedelta(days=1)
|
|
assert localtime(exception.start_datetime).date() == date
|
|
|
|
source.disable()
|
|
assert not source.enabled
|
|
assert not TimePeriodException.objects.filter(desk=desk, source=source).exists()
|
|
|
|
source.enable()
|
|
assert source.enabled
|
|
assert TimePeriodException.objects.filter(desk=desk, source=source).exists()
|
|
|
|
|
|
def test_timeperiodexception_from_settings_command():
|
|
setting = {
|
|
'EXCEPTIONS_SOURCES': {'holidays': {'class': 'workalendar.europe.France', 'label': 'Holidays'},}
|
|
}
|
|
agenda = Agenda(label=u'Test 1 agenda')
|
|
agenda.save()
|
|
desk1 = Desk(label='Test 1 desk', agenda=agenda)
|
|
desk1.save()
|
|
with override_settings(**setting):
|
|
desk2 = Desk(label='Test 2 desk', agenda=agenda)
|
|
desk2.save()
|
|
desk3 = Desk(label='Test 3 desk', agenda=agenda)
|
|
desk3.save()
|
|
source3 = TimePeriodExceptionSource.objects.get(desk=desk3)
|
|
source3.disable()
|
|
|
|
call_command('sync_desks_timeperiod_exceptions_from_settings')
|
|
assert not TimePeriodExceptionSource.objects.get(desk=desk1).enabled
|
|
source2 = TimePeriodExceptionSource.objects.get(desk=desk2)
|
|
assert source2.enabled
|
|
source3.refresh_from_db()
|
|
assert not source3.enabled
|
|
|
|
exceptions_count = source2.timeperiodexception_set.count()
|
|
# Alsace Moselle has more holidays
|
|
setting['EXCEPTIONS_SOURCES']['holidays']['class'] = 'workalendar.europe.FranceAlsaceMoselle'
|
|
with override_settings(**setting):
|
|
call_command('sync_desks_timeperiod_exceptions_from_settings')
|
|
source2.refresh_from_db()
|
|
assert exceptions_count < source2.timeperiodexception_set.count()
|
|
|
|
setting['EXCEPTIONS_SOURCES'] = {}
|
|
with override_settings(**setting):
|
|
call_command('sync_desks_timeperiod_exceptions_from_settings')
|
|
assert not TimePeriodExceptionSource.objects.exists()
|
|
|
|
|
|
def test_base_meeting_duration():
|
|
agenda = Agenda(label='Meeting', kind='meetings')
|
|
agenda.save()
|
|
|
|
with pytest.raises(ValueError):
|
|
agenda.get_base_meeting_duration()
|
|
|
|
meeting_type = MeetingType(agenda=agenda, label='Foo', duration=30)
|
|
meeting_type.save()
|
|
assert agenda.get_base_meeting_duration() == 30
|
|
|
|
meeting_type = MeetingType(agenda=agenda, label='Bar', duration=60)
|
|
meeting_type.save()
|
|
assert agenda.get_base_meeting_duration() == 30
|
|
|
|
meeting_type = MeetingType(agenda=agenda, label='Bar', duration=45)
|
|
meeting_type.save()
|
|
assert agenda.get_base_meeting_duration() == 15
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics_with_duration():
|
|
# test that event defined using duration works and give the same start and
|
|
# end dates
|
|
agenda = Agenda(label=u'Test 1 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 1 desk', agenda=agenda)
|
|
desk.save()
|
|
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
|
|
)
|
|
assert exceptions_count == 2
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
|
[
|
|
make_aware(datetime.datetime(2017, 8, 31, 19, 8, 0)),
|
|
make_aware(datetime.datetime(2017, 8, 30, 20, 8, 0)),
|
|
]
|
|
)
|
|
assert set(TimePeriodException.objects.values_list('end_datetime', flat=True)) == set(
|
|
[
|
|
make_aware(datetime.datetime(2017, 8, 31, 22, 34, 0)),
|
|
make_aware(datetime.datetime(2017, 9, 1, 0, 34, 0)),
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.mark.freeze_time('2017-12-01')
|
|
def test_timeperiodexception_creation_from_ics_with_recurrences_in_the_past():
|
|
# test that recurrent events before today are not created
|
|
# also test that duration + recurrent events works
|
|
agenda = Agenda(label=u'Test 4 agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test 4 desk', agenda=agenda)
|
|
desk.save()
|
|
assert (
|
|
desk.import_timeperiod_exceptions_from_ics_file(
|
|
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')
|
|
)
|
|
== 2
|
|
)
|
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
|
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
|
|
)
|
|
|
|
|
|
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
|
|
agenda = Agenda(label=u'Test atreal agenda')
|
|
agenda.save()
|
|
desk = Desk(label='Test atreal desk', agenda=agenda)
|
|
desk.save()
|
|
assert desk.import_timeperiod_exceptions_from_ics_file(ContentFile(ICS_ATREAL, name='sample.ics'))
|
|
|
|
|
|
def test_management_role_deletion():
|
|
group = Group(name=u'Group')
|
|
group.save()
|
|
agenda = Agenda(label=u'Test agenda', edit_role=group, view_role=group)
|
|
agenda.save()
|
|
|
|
Group.objects.all().delete()
|
|
|
|
Agenda.objects.get(id=agenda.id).view_role is None
|
|
Agenda.objects.get(id=agenda.id).edit_role is None
|
|
|
|
|
|
def test_event_bookings_annotation():
|
|
agenda = Agenda(label='test', kind='events')
|
|
agenda.save()
|
|
event = Event(start_datetime=now(), label='foo', places=10, waiting_list_places=10, agenda=agenda)
|
|
event.save()
|
|
event2 = Event(start_datetime=now(), label='bar', places=10, waiting_list_places=10, agenda=agenda)
|
|
event2.save()
|
|
|
|
Booking(event=event).save()
|
|
Booking(event=event).save()
|
|
Booking(event=event, cancellation_datetime=now()).save()
|
|
Booking(event=event, in_waiting_list=True).save()
|
|
Booking(event=event, in_waiting_list=True, cancellation_datetime=now()).save()
|
|
|
|
Booking(event=event2).save()
|
|
Booking(event=event2).save()
|
|
Booking(event=event2).save()
|
|
|
|
for event in Event.annotate_queryset(Event.objects.filter(agenda=agenda)):
|
|
if event.label == 'foo':
|
|
assert event.booked_places_count == 2
|
|
assert event.waiting_list_count == 1
|
|
elif event.label == 'bar':
|
|
assert event.booked_places_count == 3
|
|
assert event.waiting_list_count == 0
|
|
|
|
|
|
def test_virtual_agenda_init():
|
|
agenda1 = Agenda.objects.create(label=u'Agenda 1', kind='meetings')
|
|
agenda2 = Agenda.objects.create(label=u'Agenda 2', kind='meetings')
|
|
virt_agenda = Agenda.objects.create(label=u'Virtual agenda', kind='virtual')
|
|
VirtualMember.objects.create(virtual_agenda=virt_agenda, real_agenda=agenda1)
|
|
VirtualMember.objects.create(virtual_agenda=virt_agenda, real_agenda=agenda2)
|
|
virt_agenda.save()
|
|
|
|
assert virt_agenda.real_agendas.count() == 2
|
|
assert virt_agenda.real_agendas.get(pk=agenda1.pk)
|
|
assert virt_agenda.real_agendas.get(pk=agenda2.pk)
|
|
|
|
for agenda in (agenda1, agenda2):
|
|
assert agenda.virtual_agendas.count() == 1
|
|
assert agenda.virtual_agendas.get() == virt_agenda
|
|
|
|
|
|
def test_virtual_agenda_base_meeting_duration():
|
|
virt_agenda = Agenda.objects.create(label=u'Virtual agenda', kind='virtual')
|
|
|
|
with pytest.raises(ValueError):
|
|
virt_agenda.get_base_meeting_duration()
|
|
|
|
agenda1 = Agenda.objects.create(label='Agenda 1', kind='meetings')
|
|
VirtualMember.objects.create(virtual_agenda=virt_agenda, real_agenda=agenda1)
|
|
|
|
with pytest.raises(ValueError):
|
|
virt_agenda.get_base_meeting_duration()
|
|
|
|
meeting_type = MeetingType(agenda=agenda1, label='Foo', duration=30)
|
|
meeting_type.save()
|
|
assert virt_agenda.get_base_meeting_duration() == 30
|
|
|
|
meeting_type = MeetingType(agenda=agenda1, label='Bar', duration=60)
|
|
meeting_type.save()
|
|
assert virt_agenda.get_base_meeting_duration() == 30
|
|
|
|
agenda2 = Agenda.objects.create(label='Agenda 2', kind='meetings')
|
|
VirtualMember.objects.create(virtual_agenda=virt_agenda, real_agenda=agenda2)
|
|
virt_agenda.save()
|
|
|
|
meeting_type = MeetingType(agenda=agenda2, label='Bar', duration=60)
|
|
meeting_type.save()
|
|
assert virt_agenda.get_base_meeting_duration() == 60
|
|
|
|
|
|
def test_agenda_get_effective_time_periods(db):
|
|
real_agenda = Agenda.objects.create(label='Real Agenda', kind='meetings')
|
|
meeting_type = MeetingType.objects.create(agenda=real_agenda, label='MT1')
|
|
desk = Desk.objects.create(label='Real Agenda Desk1', agenda=real_agenda)
|
|
time_period = TimePeriod.objects.create(
|
|
weekday=0, start_time=datetime.time(10, 0), end_time=datetime.time(18, 0), desk=desk
|
|
)
|
|
virtual_agenda = Agenda.objects.create(label='Virtual Agenda', kind='virtual')
|
|
VirtualMember.objects.create(virtual_agenda=virtual_agenda, real_agenda=real_agenda)
|
|
|
|
# empty exclusion set
|
|
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
|
|
assert len(common_timeperiods) == 1
|
|
common_timeperiod = common_timeperiods[0]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == time_period.start_time
|
|
assert common_timeperiod.end_time == time_period.end_time
|
|
|
|
# exclusions are on a different day
|
|
def exclude_time_periods(time_periods):
|
|
virtual_agenda.excluded_timeperiods.clear()
|
|
for time_period in time_periods:
|
|
time_period.agenda = virtual_agenda
|
|
time_period.save()
|
|
|
|
exclude_time_periods(
|
|
[
|
|
TimePeriod(weekday=1, start_time=datetime.time(17, 0), end_time=datetime.time(18, 0)),
|
|
TimePeriod(weekday=2, start_time=datetime.time(17, 0), end_time=datetime.time(18, 0)),
|
|
]
|
|
)
|
|
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
|
|
assert len(common_timeperiods) == 1
|
|
common_timeperiod = common_timeperiods[0]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == time_period.start_time
|
|
assert common_timeperiod.end_time == time_period.end_time
|
|
|
|
# one exclusion, end_time should be earlier
|
|
exclude_time_periods(
|
|
[TimePeriod(weekday=0, start_time=datetime.time(17, 0), end_time=datetime.time(18, 0))]
|
|
)
|
|
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
|
|
assert len(common_timeperiods) == 1
|
|
common_timeperiod = common_timeperiods[0]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == datetime.time(10, 0)
|
|
assert common_timeperiod.end_time == datetime.time(17, 0)
|
|
|
|
# one exclusion, start_time should be later
|
|
exclude_time_periods(
|
|
[TimePeriod(weekday=0, start_time=datetime.time(10, 0), end_time=datetime.time(16, 0))]
|
|
)
|
|
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
|
|
assert len(common_timeperiods) == 1
|
|
common_timeperiod = common_timeperiods[0]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == datetime.time(16, 0)
|
|
assert common_timeperiod.end_time == datetime.time(18, 0)
|
|
|
|
# one exclusion, splits effective timeperiod in two
|
|
exclude_time_periods(
|
|
[TimePeriod(weekday=0, start_time=datetime.time(12, 0), end_time=datetime.time(16, 0))]
|
|
)
|
|
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
|
|
assert len(common_timeperiods) == 2
|
|
common_timeperiod = common_timeperiods[0]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == datetime.time(10, 0)
|
|
assert common_timeperiod.end_time == datetime.time(12, 0)
|
|
common_timeperiod = common_timeperiods[1]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == datetime.time(16, 0)
|
|
assert common_timeperiod.end_time == datetime.time(18, 0)
|
|
|
|
# several exclusion, splits effective timeperiod into pieces
|
|
exclude_time_periods(
|
|
[
|
|
TimePeriod(weekday=0, start_time=datetime.time(12, 0), end_time=datetime.time(13, 0)),
|
|
TimePeriod(weekday=0, start_time=datetime.time(10, 30), end_time=datetime.time(11, 30)),
|
|
TimePeriod(weekday=0, start_time=datetime.time(16, 30), end_time=datetime.time(17, 00)),
|
|
]
|
|
)
|
|
common_timeperiods = list(virtual_agenda.get_effective_time_periods())
|
|
assert len(common_timeperiods) == 4
|
|
|
|
common_timeperiod = common_timeperiods[0]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == datetime.time(10, 0)
|
|
assert common_timeperiod.end_time == datetime.time(10, 30)
|
|
|
|
common_timeperiod = common_timeperiods[1]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == datetime.time(11, 30)
|
|
assert common_timeperiod.end_time == datetime.time(12, 0)
|
|
|
|
common_timeperiod = common_timeperiods[2]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == datetime.time(13, 0)
|
|
assert common_timeperiod.end_time == datetime.time(16, 30)
|
|
|
|
common_timeperiod = common_timeperiods[3]
|
|
assert common_timeperiod.weekday == time_period.weekday
|
|
assert common_timeperiod.start_time == datetime.time(17, 0)
|
|
assert common_timeperiod.end_time == datetime.time(18, 0)
|
|
|
|
|
|
def test_desk_exceptions_within_two_weeks():
|
|
agenda = Agenda.objects.create(label='Agenda', kind='meetings')
|
|
desk = Desk.objects.create(agenda=agenda, label='Desk')
|
|
|
|
# no exception
|
|
assert list(desk.get_exceptions_within_two_weeks()) == []
|
|
|
|
# exception ends in the past
|
|
exception = TimePeriodException.objects.create(
|
|
desk=desk,
|
|
start_datetime=now() - datetime.timedelta(days=2),
|
|
end_datetime=now() - datetime.timedelta(days=1),
|
|
)
|
|
assert list(desk.get_exceptions_within_two_weeks()) == []
|
|
|
|
# exception ends in the future - 14 days
|
|
exception.end_datetime = now() + datetime.timedelta(days=10)
|
|
# but starts in the past
|
|
exception.start_datetime = now() - datetime.timedelta(days=2)
|
|
exception.save()
|
|
assert list(desk.get_exceptions_within_two_weeks()) == [exception]
|
|
|
|
# exception ends in the future - 14 days
|
|
exception.end_datetime = now() + datetime.timedelta(days=10)
|
|
# but starts in the future
|
|
exception.start_datetime = now() + datetime.timedelta(days=2)
|
|
exception.save()
|
|
assert list(desk.get_exceptions_within_two_weeks()) == [exception]
|
|
|
|
# exception ends in the future + 14 days
|
|
exception.end_datetime = now() + datetime.timedelta(days=20)
|
|
# but starts in the past
|
|
exception.start_datetime = now() - datetime.timedelta(days=2)
|
|
exception.save()
|
|
assert list(desk.get_exceptions_within_two_weeks()) == [exception]
|
|
|
|
# create another one, very far way from now
|
|
exception2 = TimePeriodException.objects.create(
|
|
desk=desk,
|
|
start_datetime=now() + datetime.timedelta(days=200),
|
|
end_datetime=now() + datetime.timedelta(days=201),
|
|
)
|
|
|
|
# exception ends in the future + 14 days
|
|
exception.end_datetime = now() + datetime.timedelta(days=20)
|
|
# but starts in the future - 14 days
|
|
exception.start_datetime = now() + datetime.timedelta(days=2)
|
|
exception.save()
|
|
assert list(desk.get_exceptions_within_two_weeks()) == [exception]
|
|
|
|
# exception ends in the future + 14 days
|
|
exception.end_datetime = now() + datetime.timedelta(days=20)
|
|
# but starts in the future + 14 days
|
|
exception.start_datetime = now() + datetime.timedelta(days=21)
|
|
exception.save()
|
|
assert list(desk.get_exceptions_within_two_weeks()) == [exception]
|
|
|
|
# exception in the past
|
|
exception.end_datetime = now() - datetime.timedelta(days=20)
|
|
exception.start_datetime = now() - datetime.timedelta(days=21)
|
|
exception.save()
|
|
assert list(desk.get_exceptions_within_two_weeks()) == [exception2]
|
|
|
|
# check ordering of the queryset: exception is after exception2
|
|
exception.start_datetime = now() + datetime.timedelta(days=10)
|
|
exception.end_datetime = now() + datetime.timedelta(days=11)
|
|
exception.save()
|
|
exception2.start_datetime = now() + datetime.timedelta(days=5)
|
|
exception2.end_datetime = now() + datetime.timedelta(days=6)
|
|
exception2.save()
|
|
assert list(desk.get_exceptions_within_two_weeks()) == [exception2, exception]
|
|
|
|
|
|
def test_desk_duplicate():
|
|
agenda = Agenda.objects.create(label='Agenda')
|
|
desk = Desk.objects.create(label='Desk', agenda=agenda)
|
|
time_period = TimePeriod.objects.create(
|
|
weekday=1, desk=desk, start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)
|
|
)
|
|
TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
|
|
source2 = TimePeriodExceptionSource.objects.create(
|
|
desk=desk,
|
|
ics_filename='sample.ics',
|
|
ics_file=ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics'),
|
|
)
|
|
time_period_exception = TimePeriodException.objects.create(
|
|
label='Exception',
|
|
desk=desk,
|
|
start_datetime=now() + datetime.timedelta(days=1),
|
|
end_datetime=now() + datetime.timedelta(days=2),
|
|
)
|
|
|
|
new_desk = desk.duplicate(label="New Desk")
|
|
assert new_desk.pk != desk.pk
|
|
assert new_desk.label == 'New Desk'
|
|
assert new_desk.slug == 'new-desk'
|
|
assert new_desk.timeperiod_set.count() == 1
|
|
new_time_period = TimePeriod.objects.get(desk=new_desk)
|
|
assert new_time_period.weekday == time_period.weekday
|
|
assert new_time_period.start_time == time_period.start_time
|
|
assert new_time_period.end_time == time_period.end_time
|
|
assert new_desk.timeperiodexception_set.count() == 1
|
|
new_time_period_exception = TimePeriodException.objects.get(desk=new_desk)
|
|
assert new_time_period_exception.label == time_period_exception.label
|
|
assert new_time_period_exception.start_datetime == time_period_exception.start_datetime
|
|
assert new_time_period_exception.end_datetime == time_period_exception.end_datetime
|
|
assert new_desk.timeperiodexceptionsource_set.count() == 2
|
|
assert TimePeriodExceptionSource.objects.filter(
|
|
desk=new_desk, ics_url='http://example.com/sample.ics'
|
|
).exists()
|
|
new_source2 = TimePeriodExceptionSource.objects.get(desk=new_desk, ics_filename='sample.ics')
|
|
assert new_source2.ics_file.path != source2.ics_file.path
|
|
|
|
# duplicate again !
|
|
new_desk = desk.duplicate(label="New Desk")
|
|
assert new_desk.slug == 'new-desk-1'
|
|
|
|
|
|
def test_agenda_meetings_duplicate():
|
|
group = Group(name=u'Group')
|
|
group.save()
|
|
agenda = Agenda.objects.create(label='Agenda', kind='meetings', view_role=group)
|
|
desk = Desk.objects.create(label='Desk', agenda=agenda)
|
|
meeting_type = MeetingType.objects.create(agenda=agenda, label='meeting', duration=30)
|
|
time_period = TimePeriod.objects.create(
|
|
weekday=1, desk=desk, start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)
|
|
)
|
|
TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
|
|
source2 = TimePeriodExceptionSource.objects.create(
|
|
desk=desk,
|
|
ics_filename='sample.ics',
|
|
ics_file=ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics'),
|
|
)
|
|
time_period_exception = TimePeriodException.objects.create(
|
|
label='Exception',
|
|
desk=desk,
|
|
start_datetime=now() + datetime.timedelta(days=1),
|
|
end_datetime=now() + datetime.timedelta(days=2),
|
|
)
|
|
resource = Resource.objects.create(label=u'Foo bar')
|
|
agenda.resources.add(resource)
|
|
|
|
new_agenda = agenda.duplicate()
|
|
assert new_agenda.pk != agenda.pk
|
|
assert new_agenda.label == 'Copy of Agenda'
|
|
assert new_agenda.slug == 'copy-of-agenda'
|
|
assert new_agenda.kind == 'meetings'
|
|
assert new_agenda.view_role == group
|
|
assert new_agenda.resources.first() == resource
|
|
|
|
new_meeting_type = new_agenda.meetingtype_set.first()
|
|
assert new_meeting_type.pk != meeting_type.pk
|
|
assert new_meeting_type.label == meeting_type.label
|
|
assert new_meeting_type.duration == meeting_type.duration
|
|
assert new_meeting_type.slug == meeting_type.slug
|
|
|
|
new_desk = new_agenda.desk_set.first()
|
|
assert new_desk.pk != desk.pk
|
|
assert new_desk.label == desk.label
|
|
assert new_desk.slug == desk.slug
|
|
assert new_desk.timeperiod_set.count() == 1
|
|
new_time_period = TimePeriod.objects.get(desk=new_desk)
|
|
assert new_time_period.weekday == time_period.weekday
|
|
assert new_time_period.start_time == time_period.start_time
|
|
assert new_time_period.end_time == time_period.end_time
|
|
assert new_desk.timeperiodexception_set.count() == 1
|
|
new_time_period_exception = TimePeriodException.objects.get(desk=new_desk)
|
|
assert new_time_period_exception.label == time_period_exception.label
|
|
assert new_time_period_exception.start_datetime == time_period_exception.start_datetime
|
|
assert new_time_period_exception.end_datetime == time_period_exception.end_datetime
|
|
assert new_desk.timeperiodexceptionsource_set.count() == 2
|
|
assert TimePeriodExceptionSource.objects.filter(
|
|
desk=new_desk, ics_url='http://example.com/sample.ics'
|
|
).exists()
|
|
new_source2 = TimePeriodExceptionSource.objects.get(desk=new_desk, ics_filename='sample.ics')
|
|
assert new_source2.ics_file.path != source2.ics_file.path
|
|
|
|
# duplicate again !
|
|
new_agenda = agenda.duplicate()
|
|
assert new_agenda.slug == 'copy-of-agenda-1'
|
|
|
|
|
|
def test_agenda_events_duplicate():
|
|
agenda = Agenda.objects.create(label='Agenda', kind='events')
|
|
event = Event.objects.create(
|
|
agenda=agenda,
|
|
start_datetime=now() + datetime.timedelta(days=1),
|
|
duration=10,
|
|
places=10,
|
|
label='event',
|
|
slug='event',
|
|
)
|
|
|
|
new_agenda = agenda.duplicate()
|
|
assert new_agenda.pk != agenda.pk
|
|
assert new_agenda.kind == 'events'
|
|
|
|
new_event = new_agenda.event_set.first()
|
|
assert new_event.pk != event.pk
|
|
assert new_event.label == event.label
|
|
assert new_event.duration == event.duration
|
|
assert new_event.duration == event.duration
|
|
assert new_event.places == event.places
|
|
assert new_event.start_datetime == event.start_datetime
|
|
|
|
|
|
def test_agenda_virtual_duplicate():
|
|
agenda1 = Agenda.objects.create(label=u'Agenda 1', kind='meetings')
|
|
agenda2 = Agenda.objects.create(label=u'Agenda 2', kind='meetings')
|
|
virt_agenda = Agenda.objects.create(label=u'Virtual agenda', kind='virtual')
|
|
VirtualMember.objects.create(virtual_agenda=virt_agenda, real_agenda=agenda1)
|
|
VirtualMember.objects.create(virtual_agenda=virt_agenda, real_agenda=agenda2)
|
|
virt_agenda.save()
|
|
|
|
excluded_timeperiod = TimePeriod.objects.create(
|
|
weekday=1, start_time=datetime.time(10, 0), end_time=datetime.time(12, 0), agenda=virt_agenda
|
|
)
|
|
|
|
new_agenda = virt_agenda.duplicate()
|
|
assert new_agenda.pk != virt_agenda.pk
|
|
assert new_agenda.kind == 'virtual'
|
|
|
|
new_time_period = new_agenda.excluded_timeperiods.first()
|
|
assert new_time_period.pk != excluded_timeperiod.pk
|
|
assert new_time_period.agenda == new_agenda
|
|
assert new_time_period.weekday == excluded_timeperiod.weekday
|
|
assert new_time_period.start_time == excluded_timeperiod.start_time
|
|
assert new_time_period.end_time == excluded_timeperiod.end_time
|
|
|
|
assert VirtualMember.objects.filter(virtual_agenda=new_agenda, real_agenda=agenda1).exists()
|
|
assert VirtualMember.objects.filter(virtual_agenda=new_agenda, real_agenda=agenda2).exists()
|
|
|
|
|
|
def test_agendas_cancel_events_command():
|
|
agenda = Agenda.objects.create(label='Events', kind='events')
|
|
event = Event.objects.create(agenda=agenda, start_datetime=now(), places=10, label='Event')
|
|
|
|
for i in range(5):
|
|
Booking.objects.create(event=event, cancel_callback_url='http://example.org/jump/trigger/')
|
|
|
|
event.cancellation_scheduled = True
|
|
event.save()
|
|
|
|
with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as mock_send:
|
|
mock_response = mock.Mock(status_code=200)
|
|
mock_send.return_value = mock_response
|
|
call_command('cancel_events')
|
|
assert mock_send.call_count == 5
|
|
|
|
assert Booking.objects.filter(cancellation_datetime__isnull=False).count() == 5
|
|
event.refresh_from_db()
|
|
assert not event.cancellation_scheduled
|
|
assert event.cancelled
|
|
|
|
|
|
def test_agendas_cancel_events_command_network_error(freezer):
|
|
freezer.move_to('2020-01-01')
|
|
agenda = Agenda.objects.create(label='Events', kind='events')
|
|
event = Event.objects.create(agenda=agenda, start_datetime=now(), places=10, label='Event')
|
|
|
|
def mocked_requests_connection_error(request, **kwargs):
|
|
if 'good' in request.url:
|
|
return mock.Mock(status_code=200)
|
|
raise requests.exceptions.ConnectionError('unreachable')
|
|
|
|
booking_good_url = Booking.objects.create(event=event, cancel_callback_url='http://good.org/')
|
|
for i in range(5):
|
|
Booking.objects.create(event=event, cancel_callback_url='http://example.org/jump/trigger/')
|
|
|
|
event.cancellation_scheduled = True
|
|
event.save()
|
|
|
|
with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as mock_send:
|
|
mock_response = mock.Mock(status_code=200)
|
|
mock_send.return_value = mock_response
|
|
mock_send.side_effect = mocked_requests_connection_error
|
|
call_command('cancel_events')
|
|
assert mock_send.call_count == 6
|
|
|
|
booking_good_url.refresh_from_db()
|
|
assert booking_good_url.cancellation_datetime
|
|
assert Booking.objects.filter(cancellation_datetime__isnull=True).count() == 5
|
|
event.refresh_from_db()
|
|
assert not event.cancellation_scheduled
|
|
assert not event.cancelled
|
|
|
|
report = EventCancellationReport.objects.get(event=event)
|
|
assert report.bookings.count() == 5
|
|
assert len(report.booking_errors) == 5
|
|
|
|
for booking in report.bookings.all():
|
|
assert report.booking_errors[str(booking.pk)] == 'unreachable'
|
|
|
|
# old reports are automatically removed
|
|
freezer.move_to('2020-03-01')
|
|
call_command('cancel_events')
|
|
assert not EventCancellationReport.objects.exists()
|