345 lines
14 KiB
Python
345 lines
14 KiB
Python
import datetime
|
||
|
||
import pytest
|
||
from django.db import IntegrityError, ProgrammingError, connection, transaction
|
||
from django.db.migrations.executor import MigrationExecutor
|
||
from django.test import override_settings
|
||
from django.utils.timezone import now
|
||
|
||
from chrono.agendas.models import Agenda, Booking, Desk, Event, MeetingType
|
||
|
||
pytestmark = pytest.mark.django_db
|
||
|
||
|
||
def check_ignore_reason(event, value):
|
||
with connection.cursor() as cursor:
|
||
cursor.execute("SELECT _ignore_reason FROM agendas_event WHERE id = %s", [event.pk])
|
||
row = cursor.fetchone()
|
||
assert row[0] == value
|
||
|
||
|
||
def set_ignore_reason(event, value):
|
||
with connection.cursor() as cursor:
|
||
cursor.execute("UPDATE agendas_event SET _ignore_reason = %s WHERE id = %s", [value, event.pk])
|
||
|
||
|
||
def check_end_datetime(event, value):
|
||
with connection.cursor() as cursor:
|
||
cursor.execute("SELECT _end_datetime FROM agendas_event WHERE id = %s", [event.pk])
|
||
row = cursor.fetchone()
|
||
assert row[0] == value
|
||
|
||
|
||
def test_event_ignore_reason():
|
||
agenda = Agenda.objects.create(label='Meetings', kind='meetings')
|
||
meeting_type = MeetingType.objects.create(agenda=agenda, label='Foo', duration=60)
|
||
desk = Desk.objects.create(agenda=agenda, label='Desk')
|
||
|
||
event = Event.objects.create(
|
||
start_datetime=now(), meeting_type=meeting_type, places=10, agenda=agenda, desk=desk
|
||
)
|
||
|
||
try:
|
||
check_ignore_reason(event, None)
|
||
except ProgrammingError:
|
||
pytest.skip('btree_Gist extension required')
|
||
|
||
Booking.objects.create(event=event)
|
||
check_ignore_reason(event, None)
|
||
Booking.objects.create(event=event, cancellation_datetime=now())
|
||
check_ignore_reason(event, 'cancel')
|
||
Booking.objects.update(cancellation_datetime=None)
|
||
check_ignore_reason(event, None)
|
||
Booking.objects.update(cancellation_datetime=now())
|
||
check_ignore_reason(event, 'cancel')
|
||
Booking.objects.first().delete()
|
||
check_ignore_reason(event, 'delete')
|
||
Booking.objects.all().delete()
|
||
check_ignore_reason(event, 'delete')
|
||
|
||
|
||
def test_event_end_datetime():
|
||
agenda = Agenda.objects.create(label='Meetings', kind='meetings')
|
||
meeting_type1 = MeetingType.objects.create(agenda=agenda, label='Foo', duration=60)
|
||
meeting_type2 = MeetingType.objects.create(agenda=agenda, label='Foo', duration=45)
|
||
|
||
event = Event.objects.create(start_datetime=now(), places=10, agenda=agenda)
|
||
|
||
try:
|
||
check_end_datetime(event, None)
|
||
except ProgrammingError:
|
||
pytest.skip('btree_Gist extension required')
|
||
|
||
event.meeting_type = meeting_type1
|
||
event.save()
|
||
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=60))
|
||
event.meeting_type = meeting_type2
|
||
event.save()
|
||
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45))
|
||
event.start_datetime = now()
|
||
event.save()
|
||
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45))
|
||
event.meeting_type = None
|
||
event.save()
|
||
check_end_datetime(event, None)
|
||
event.meeting_type = meeting_type2
|
||
event.save()
|
||
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45))
|
||
|
||
event2 = Event.objects.create(start_datetime=now(), meeting_type=meeting_type1, places=10, agenda=agenda)
|
||
check_end_datetime(event2, event2.start_datetime + datetime.timedelta(minutes=60))
|
||
meeting_type1.duration = 42
|
||
meeting_type1.save()
|
||
check_end_datetime(event2, event2.start_datetime + datetime.timedelta(minutes=42))
|
||
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45))
|
||
|
||
|
||
def test_meeting_event_exclusion_constraint():
|
||
agenda = Agenda.objects.create(label='Meetings', kind='meetings')
|
||
meeting_type1 = MeetingType.objects.create(agenda=agenda, label='Foo 1', duration=60)
|
||
meeting_type2 = MeetingType.objects.create(agenda=agenda, label='Foo 2', duration=30)
|
||
desk1 = Desk.objects.create(agenda=agenda, label='Desk 1')
|
||
desk2 = Desk.objects.create(agenda=agenda, label='Desk 2')
|
||
|
||
# create an event
|
||
event1 = Event.objects.create(
|
||
start_datetime=now(), meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1
|
||
)
|
||
|
||
try:
|
||
check_ignore_reason(event1, None)
|
||
except ProgrammingError:
|
||
pytest.skip('btree_Gist extension required')
|
||
|
||
# create a event with the same date range on other desk => no error
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk2
|
||
)
|
||
# no check if no meeting_type set
|
||
Event.objects.create(start_datetime=event1.start_datetime, places=10, agenda=agenda, desk=desk1)
|
||
# no check if no desk set
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda
|
||
)
|
||
|
||
# create an event just after the first one => ok
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime + datetime.timedelta(minutes=60),
|
||
meeting_type=meeting_type1,
|
||
places=10,
|
||
agenda=agenda,
|
||
desk=desk1,
|
||
)
|
||
|
||
# try to create an event with the same date range and the same desk => error
|
||
with pytest.raises(IntegrityError):
|
||
with transaction.atomic():
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime,
|
||
meeting_type=meeting_type1,
|
||
places=10,
|
||
agenda=agenda,
|
||
desk=desk1,
|
||
)
|
||
# try to create an event with the same start date, other duration and the same desk => error
|
||
with pytest.raises(IntegrityError):
|
||
with transaction.atomic():
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime,
|
||
meeting_type=meeting_type2,
|
||
places=10,
|
||
agenda=agenda,
|
||
desk=desk1,
|
||
)
|
||
# try to create an event overlaping the first one on the same desk => error
|
||
with pytest.raises(IntegrityError):
|
||
with transaction.atomic():
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime + datetime.timedelta(minutes=10),
|
||
meeting_type=meeting_type1,
|
||
places=10,
|
||
agenda=agenda,
|
||
desk=desk1,
|
||
)
|
||
with pytest.raises(IntegrityError):
|
||
with transaction.atomic():
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime + datetime.timedelta(minutes=10),
|
||
meeting_type=meeting_type2,
|
||
places=10,
|
||
agenda=agenda,
|
||
desk=desk1,
|
||
)
|
||
with pytest.raises(IntegrityError):
|
||
with transaction.atomic():
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime - datetime.timedelta(minutes=10),
|
||
meeting_type=meeting_type1,
|
||
places=10,
|
||
agenda=agenda,
|
||
desk=desk1,
|
||
)
|
||
with pytest.raises(IntegrityError):
|
||
with transaction.atomic():
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime - datetime.timedelta(minutes=10),
|
||
meeting_type=meeting_type2,
|
||
places=10,
|
||
agenda=agenda,
|
||
desk=desk1,
|
||
)
|
||
# but if event1 is cancelled it's ok
|
||
Booking.objects.create(event=event1, cancellation_datetime=now())
|
||
Event.objects.create(
|
||
start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1
|
||
)
|
||
|
||
|
||
def test_clean_time_period_exceptions(transactional_db):
|
||
app = 'agendas'
|
||
|
||
migrate_from = [(app, '0065_unavailability_calendar')]
|
||
migrate_to = [(app, '0066_timeperiodexceptionsource_unique_settings_slug')]
|
||
executor = MigrationExecutor(connection)
|
||
old_apps = executor.loader.project_state(migrate_from).apps
|
||
executor.migrate(migrate_from)
|
||
|
||
Agenda = old_apps.get_model(app, 'Agenda')
|
||
Desk = old_apps.get_model(app, 'Desk')
|
||
TimePeriodException = old_apps.get_model(app, 'TimePeriodException')
|
||
TimePeriodExceptionSource = old_apps.get_model(app, 'TimePeriodExceptionSource')
|
||
|
||
agenda = Agenda.objects.create(label='Agenda')
|
||
desk = Desk.objects.create(label='Desk', slug='desk', agenda=agenda)
|
||
|
||
# add normal time period exception to Desk
|
||
source_desk = TimePeriodExceptionSource.objects.create(desk=desk, settings_slug='holidays', enabled=True)
|
||
start_datetime = datetime.datetime(year=2020, month=1, day=2)
|
||
end_datetime = datetime.datetime(year=2020, month=1, day=3)
|
||
for _ in range(5):
|
||
TimePeriodException.objects.create(
|
||
desk=desk,
|
||
source=source_desk,
|
||
external=True,
|
||
start_datetime=start_datetime,
|
||
end_datetime=end_datetime,
|
||
)
|
||
|
||
# now simulate broken state (desk duplication)
|
||
new_desk = Desk.objects.create(label='New Desk', slug='new-desk', agenda=agenda)
|
||
|
||
# normal source and exceptions
|
||
source_new_desk = TimePeriodExceptionSource.objects.create(
|
||
desk=new_desk, settings_slug='holidays', enabled=True
|
||
)
|
||
for _ in range(5):
|
||
TimePeriodException.objects.create(
|
||
desk=new_desk,
|
||
source=source_new_desk,
|
||
external=True,
|
||
start_datetime=start_datetime,
|
||
end_datetime=end_datetime,
|
||
)
|
||
|
||
# wrong duplicate of source
|
||
TimePeriodExceptionSource.objects.create(desk=new_desk, settings_slug='holidays', enabled=True)
|
||
|
||
# wrong duplicate of exceptions, referencing original desk source
|
||
for _ in range(5):
|
||
TimePeriodException.objects.create(
|
||
desk=new_desk,
|
||
source=source_desk,
|
||
external=True,
|
||
start_datetime=start_datetime,
|
||
end_datetime=end_datetime,
|
||
)
|
||
|
||
# extra data that should not be touched
|
||
other_exception = TimePeriodException.objects.create(
|
||
desk=desk,
|
||
start_datetime=start_datetime,
|
||
end_datetime=end_datetime,
|
||
)
|
||
other_source = TimePeriodExceptionSource.objects.create(desk=desk, ics_file='test.ics')
|
||
# even if wrong desk, this exception is not from settings thus should not get removed
|
||
exception_from_ics = TimePeriodException.objects.create(
|
||
desk=new_desk,
|
||
start_datetime=start_datetime,
|
||
end_datetime=end_datetime,
|
||
source=other_source,
|
||
)
|
||
|
||
# ensure migration fixes state
|
||
executor = MigrationExecutor(connection)
|
||
executor.migrate(migrate_to)
|
||
executor.loader.build_graph()
|
||
|
||
apps = executor.loader.project_state(migrate_to).apps
|
||
Desk = apps.get_model(app, 'Desk')
|
||
TimePeriodException = apps.get_model(app, 'TimePeriodException')
|
||
TimePeriodExceptionSource = apps.get_model(app, 'TimePeriodExceptionSource')
|
||
|
||
# original desk hasn't been touched
|
||
desk = Desk.objects.get(pk=desk.pk)
|
||
assert desk.timeperiodexception_set.filter(source__settings_slug='holidays').count() == 5
|
||
assert desk.timeperiodexceptionsource_set.filter(settings_slug='holidays').count() == 1
|
||
assert desk.timeperiodexception_set.filter(pk=other_exception.pk).exists()
|
||
assert desk.timeperiodexceptionsource_set.filter(pk=other_source.pk).exists()
|
||
|
||
# duplicated desk has correct exceptions
|
||
new_desk = Desk.objects.get(pk=new_desk.pk)
|
||
assert new_desk.timeperiodexception_set.filter(source__settings_slug='holidays').count() == 5
|
||
assert new_desk.timeperiodexceptionsource_set.filter(settings_slug='holidays').count() == 1
|
||
assert new_desk.timeperiodexception_set.filter(pk=exception_from_ics.pk).exists()
|
||
|
||
exc = new_desk.timeperiodexception_set.filter(source__settings_slug='holidays').first()
|
||
assert exc.source == new_desk.timeperiodexceptionsource_set.get(settings_slug='holidays')
|
||
|
||
|
||
@override_settings(LANGUAGE_CODE='fr-fr')
|
||
def test_translate_holidays_exceptions(transactional_db):
|
||
app = 'agendas'
|
||
|
||
migrate_from = [(app, '0068_remove_timeperiodexception_external')]
|
||
migrate_to = [(app, '0069_translate_holidays')]
|
||
executor = MigrationExecutor(connection)
|
||
old_apps = executor.loader.project_state(migrate_from).apps
|
||
executor.migrate(migrate_from)
|
||
|
||
Agenda = old_apps.get_model(app, 'Agenda')
|
||
Desk = old_apps.get_model(app, 'Desk')
|
||
TimePeriodException = old_apps.get_model(app, 'TimePeriodException')
|
||
TimePeriodExceptionSource = old_apps.get_model(app, 'TimePeriodExceptionSource')
|
||
|
||
agenda = Agenda.objects.create(label='Agenda')
|
||
desk = Desk.objects.create(label='Desk', slug='desk', agenda=agenda)
|
||
source_desk = TimePeriodExceptionSource.objects.create(desk=desk, settings_slug='holidays')
|
||
start_datetime = datetime.datetime(year=2020, month=1, day=2)
|
||
end_datetime = datetime.datetime(year=2020, month=1, day=3)
|
||
TimePeriodException.objects.create(
|
||
desk=desk,
|
||
source=source_desk,
|
||
label='All Saints Day',
|
||
start_datetime=start_datetime,
|
||
end_datetime=end_datetime,
|
||
)
|
||
TimePeriodException.objects.create(
|
||
desk=desk,
|
||
source=source_desk,
|
||
label='New year',
|
||
start_datetime=start_datetime,
|
||
end_datetime=end_datetime,
|
||
)
|
||
|
||
executor = MigrationExecutor(connection)
|
||
executor.migrate(migrate_to)
|
||
executor.loader.build_graph()
|
||
|
||
apps = executor.loader.project_state(migrate_to).apps
|
||
Desk = apps.get_model(app, 'Desk')
|
||
desk = Desk.objects.get(slug='desk')
|
||
assert not desk.timeperiodexception_set.filter(label='All Saints Day').exists()
|
||
assert not desk.timeperiodexception_set.filter(label='New year').exists()
|
||
assert desk.timeperiodexception_set.filter(label='Toussaint').count() == 1
|
||
assert desk.timeperiodexception_set.filter(label='Jour de l’An').count() == 1
|