309 lines
12 KiB
Python
309 lines
12 KiB
Python
import datetime
|
|
|
|
from django.db import IntegrityError
|
|
from django.db import ProgrammingError
|
|
from django.db import connection
|
|
from django.db import transaction
|
|
from django.db.migrations.executor import MigrationExecutor
|
|
from django.utils.timezone import now
|
|
|
|
|
|
import pytest
|
|
|
|
from chrono.agendas.models import Agenda
|
|
from chrono.agendas.models import Booking
|
|
from chrono.agendas.models import Desk
|
|
from chrono.agendas.models import Event
|
|
from chrono.agendas.models import MeetingType
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
def check_ignore_reason(event, value):
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("SELECT _ignore_reason FROM agendas_event WHERE id = %s", [event.pk])
|
|
row = cursor.fetchone()
|
|
assert row[0] == value
|
|
|
|
|
|
def set_ignore_reason(event, value):
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("UPDATE agendas_event SET _ignore_reason = %s WHERE id = %s", [value, event.pk])
|
|
|
|
|
|
def check_end_datetime(event, value):
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("SELECT _end_datetime FROM agendas_event WHERE id = %s", [event.pk])
|
|
row = cursor.fetchone()
|
|
assert row[0] == value
|
|
|
|
|
|
def test_event_ignore_reason():
|
|
if connection.vendor != 'postgresql':
|
|
pytest.skip('postgresql required')
|
|
|
|
agenda = Agenda.objects.create(label='Meetings', kind='meetings')
|
|
meeting_type = MeetingType.objects.create(agenda=agenda, label='Foo', duration=60)
|
|
desk = Desk.objects.create(agenda=agenda, label='Desk')
|
|
|
|
event = Event.objects.create(
|
|
start_datetime=now(), meeting_type=meeting_type, places=10, agenda=agenda, desk=desk
|
|
)
|
|
|
|
try:
|
|
check_ignore_reason(event, None)
|
|
except ProgrammingError:
|
|
pytest.skip('btree_Gist extension required')
|
|
|
|
Booking.objects.create(event=event)
|
|
check_ignore_reason(event, None)
|
|
Booking.objects.create(event=event, cancellation_datetime=now())
|
|
check_ignore_reason(event, 'cancel')
|
|
Booking.objects.update(cancellation_datetime=None)
|
|
check_ignore_reason(event, None)
|
|
Booking.objects.update(cancellation_datetime=now())
|
|
check_ignore_reason(event, 'cancel')
|
|
Booking.objects.first().delete()
|
|
check_ignore_reason(event, 'cancel')
|
|
Booking.objects.all().delete()
|
|
check_ignore_reason(event, None)
|
|
|
|
|
|
def test_event_end_datetime():
|
|
if connection.vendor != 'postgresql':
|
|
pytest.skip('postgresql required')
|
|
|
|
agenda = Agenda.objects.create(label='Meetings', kind='meetings')
|
|
meeting_type1 = MeetingType.objects.create(agenda=agenda, label='Foo', duration=60)
|
|
meeting_type2 = MeetingType.objects.create(agenda=agenda, label='Foo', duration=45)
|
|
|
|
event = Event.objects.create(start_datetime=now(), places=10, agenda=agenda)
|
|
|
|
try:
|
|
check_end_datetime(event, None)
|
|
except ProgrammingError:
|
|
pytest.skip('btree_Gist extension required')
|
|
|
|
event.meeting_type = meeting_type1
|
|
event.save()
|
|
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=60))
|
|
event.meeting_type = meeting_type2
|
|
event.save()
|
|
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45))
|
|
event.start_datetime = now()
|
|
event.save()
|
|
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45))
|
|
event.meeting_type = None
|
|
event.save()
|
|
check_end_datetime(event, None)
|
|
event.meeting_type = meeting_type2
|
|
event.save()
|
|
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45))
|
|
|
|
event2 = Event.objects.create(start_datetime=now(), meeting_type=meeting_type1, places=10, agenda=agenda)
|
|
check_end_datetime(event2, event2.start_datetime + datetime.timedelta(minutes=60))
|
|
meeting_type1.duration = 42
|
|
meeting_type1.save()
|
|
check_end_datetime(event2, event2.start_datetime + datetime.timedelta(minutes=42))
|
|
check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45))
|
|
|
|
|
|
def test_meeting_event_exclusion_constraint():
|
|
if connection.vendor != 'postgresql':
|
|
pytest.skip('postgresql required')
|
|
|
|
agenda = Agenda.objects.create(label='Meetings', kind='meetings')
|
|
meeting_type1 = MeetingType.objects.create(agenda=agenda, label='Foo 1', duration=60)
|
|
meeting_type2 = MeetingType.objects.create(agenda=agenda, label='Foo 2', duration=30)
|
|
desk1 = Desk.objects.create(agenda=agenda, label='Desk 1')
|
|
desk2 = Desk.objects.create(agenda=agenda, label='Desk 2')
|
|
|
|
# create an event
|
|
event1 = Event.objects.create(
|
|
start_datetime=now(), meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1
|
|
)
|
|
|
|
try:
|
|
check_ignore_reason(event1, None)
|
|
except ProgrammingError:
|
|
pytest.skip('btree_Gist extension required')
|
|
|
|
# create a event with the same date range on other desk => no error
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk2
|
|
)
|
|
# no check if no meeting_type set
|
|
Event.objects.create(start_datetime=event1.start_datetime, places=10, agenda=agenda, desk=desk1)
|
|
# no check if no desk set
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda
|
|
)
|
|
|
|
# create an event just after the first one => ok
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime + datetime.timedelta(minutes=60),
|
|
meeting_type=meeting_type1,
|
|
places=10,
|
|
agenda=agenda,
|
|
desk=desk1,
|
|
)
|
|
|
|
# try to create an event with the same date range and the same desk => error
|
|
with pytest.raises(IntegrityError):
|
|
with transaction.atomic():
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime,
|
|
meeting_type=meeting_type1,
|
|
places=10,
|
|
agenda=agenda,
|
|
desk=desk1,
|
|
)
|
|
# try to create an event with the same start date, other duration and the same desk => error
|
|
with pytest.raises(IntegrityError):
|
|
with transaction.atomic():
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime,
|
|
meeting_type=meeting_type2,
|
|
places=10,
|
|
agenda=agenda,
|
|
desk=desk1,
|
|
)
|
|
# try to create an event overlaping the first one on the same desk => error
|
|
with pytest.raises(IntegrityError):
|
|
with transaction.atomic():
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime + datetime.timedelta(minutes=10),
|
|
meeting_type=meeting_type1,
|
|
places=10,
|
|
agenda=agenda,
|
|
desk=desk1,
|
|
)
|
|
with pytest.raises(IntegrityError):
|
|
with transaction.atomic():
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime + datetime.timedelta(minutes=10),
|
|
meeting_type=meeting_type2,
|
|
places=10,
|
|
agenda=agenda,
|
|
desk=desk1,
|
|
)
|
|
with pytest.raises(IntegrityError):
|
|
with transaction.atomic():
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime - datetime.timedelta(minutes=10),
|
|
meeting_type=meeting_type1,
|
|
places=10,
|
|
agenda=agenda,
|
|
desk=desk1,
|
|
)
|
|
with pytest.raises(IntegrityError):
|
|
with transaction.atomic():
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime - datetime.timedelta(minutes=10),
|
|
meeting_type=meeting_type2,
|
|
places=10,
|
|
agenda=agenda,
|
|
desk=desk1,
|
|
)
|
|
# but if event1 is cancelled it's ok
|
|
Booking.objects.create(event=event1, cancellation_datetime=now())
|
|
Event.objects.create(
|
|
start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1
|
|
)
|
|
|
|
|
|
def test_clean_time_period_exceptions(transactional_db):
|
|
app = 'agendas'
|
|
|
|
migrate_from = [(app, '0065_unavailability_calendar')]
|
|
migrate_to = [(app, '0066_timeperiodexceptionsource_unique_settings_slug')]
|
|
executor = MigrationExecutor(connection)
|
|
old_apps = executor.loader.project_state(migrate_from).apps
|
|
executor.migrate(migrate_from)
|
|
|
|
Agenda = old_apps.get_model(app, 'Agenda')
|
|
Desk = old_apps.get_model(app, 'Desk')
|
|
TimePeriodException = old_apps.get_model(app, 'TimePeriodException')
|
|
TimePeriodExceptionSource = old_apps.get_model(app, 'TimePeriodExceptionSource')
|
|
|
|
agenda = Agenda.objects.create(label='Agenda')
|
|
desk = Desk.objects.create(label='Desk', slug='desk', agenda=agenda)
|
|
|
|
# add normal time period exception to Desk
|
|
source_desk = TimePeriodExceptionSource.objects.create(desk=desk, settings_slug='holidays', enabled=True)
|
|
start_datetime = datetime.datetime(year=2020, month=1, day=2)
|
|
end_datetime = datetime.datetime(year=2020, month=1, day=3)
|
|
for i in range(5):
|
|
TimePeriodException.objects.create(
|
|
desk=desk,
|
|
source=source_desk,
|
|
external=True,
|
|
start_datetime=start_datetime,
|
|
end_datetime=end_datetime,
|
|
)
|
|
|
|
# now simulate broken state (desk duplication)
|
|
new_desk = Desk.objects.create(label='New Desk', slug='new-desk', agenda=agenda)
|
|
|
|
# normal source and exceptions
|
|
source_new_desk = TimePeriodExceptionSource.objects.create(
|
|
desk=new_desk, settings_slug='holidays', enabled=True
|
|
)
|
|
for i in range(5):
|
|
TimePeriodException.objects.create(
|
|
desk=new_desk,
|
|
source=source_new_desk,
|
|
external=True,
|
|
start_datetime=start_datetime,
|
|
end_datetime=end_datetime,
|
|
)
|
|
|
|
# wrong duplicate of source
|
|
TimePeriodExceptionSource.objects.create(desk=new_desk, settings_slug='holidays', enabled=True)
|
|
|
|
# wrong duplicate of exceptions, referencing original desk source
|
|
for i in range(5):
|
|
TimePeriodException.objects.create(
|
|
desk=new_desk,
|
|
source=source_desk,
|
|
external=True,
|
|
start_datetime=start_datetime,
|
|
end_datetime=end_datetime,
|
|
)
|
|
|
|
# extra data that should not be touched
|
|
other_exception = TimePeriodException.objects.create(
|
|
desk=desk, start_datetime=start_datetime, end_datetime=end_datetime,
|
|
)
|
|
other_source = TimePeriodExceptionSource.objects.create(desk=desk, ics_file='test.ics')
|
|
# even if wrong desk, this exception is not from settings thus should not get removed
|
|
exception_from_ics = TimePeriodException.objects.create(
|
|
desk=new_desk, start_datetime=start_datetime, end_datetime=end_datetime, source=other_source,
|
|
)
|
|
|
|
# ensure migration fixes state
|
|
executor = MigrationExecutor(connection)
|
|
executor.migrate(migrate_to)
|
|
executor.loader.build_graph()
|
|
|
|
apps = executor.loader.project_state(migrate_to).apps
|
|
Desk = apps.get_model(app, 'Desk')
|
|
TimePeriodException = apps.get_model(app, 'TimePeriodException')
|
|
TimePeriodExceptionSource = apps.get_model(app, 'TimePeriodExceptionSource')
|
|
|
|
# original desk hasn't been touched
|
|
desk = Desk.objects.get(pk=desk.pk)
|
|
assert desk.timeperiodexception_set.filter(source__settings_slug='holidays').count() == 5
|
|
assert desk.timeperiodexceptionsource_set.filter(settings_slug='holidays').count() == 1
|
|
assert desk.timeperiodexception_set.filter(pk=other_exception.pk).exists()
|
|
assert desk.timeperiodexceptionsource_set.filter(pk=other_source.pk).exists()
|
|
|
|
# duplicated desk has correct exceptions
|
|
new_desk = Desk.objects.get(pk=new_desk.pk)
|
|
assert new_desk.timeperiodexception_set.filter(source__settings_slug='holidays').count() == 5
|
|
assert new_desk.timeperiodexceptionsource_set.filter(settings_slug='holidays').count() == 1
|
|
assert new_desk.timeperiodexception_set.filter(pk=exception_from_ics.pk).exists()
|
|
|
|
exc = new_desk.timeperiodexception_set.filter(source__settings_slug='holidays').first()
|
|
assert exc.source == new_desk.timeperiodexceptionsource_set.get(settings_slug='holidays')
|