import datetime import pytest from django.db import IntegrityError, ProgrammingError, connection, transaction from django.db.migrations.executor import MigrationExecutor from django.test import override_settings from chrono.agendas.models import Agenda, Booking, Desk, Event, MeetingType from chrono.utils.timezone import now pytestmark = pytest.mark.django_db def check_ignore_reason(event, value): with connection.cursor() as cursor: cursor.execute('SELECT _ignore_reason FROM agendas_event WHERE id = %s', [event.pk]) row = cursor.fetchone() assert row[0] == value def set_ignore_reason(event, value): with connection.cursor() as cursor: cursor.execute('UPDATE agendas_event SET _ignore_reason = %s WHERE id = %s', [value, event.pk]) def check_end_datetime(event, value): with connection.cursor() as cursor: cursor.execute('SELECT _end_datetime FROM agendas_event WHERE id = %s', [event.pk]) row = cursor.fetchone() assert row[0] == value def test_event_ignore_reason(): agenda = Agenda.objects.create(label='Meetings', kind='meetings') meeting_type = MeetingType.objects.create(agenda=agenda, label='Foo', duration=60) desk = Desk.objects.create(agenda=agenda, label='Desk') event = Event.objects.create( start_datetime=now(), meeting_type=meeting_type, places=10, agenda=agenda, desk=desk ) try: check_ignore_reason(event, None) except ProgrammingError: pytest.skip('btree_Gist extension required') Booking.objects.create(event=event) check_ignore_reason(event, None) Booking.objects.create(event=event, cancellation_datetime=now()) check_ignore_reason(event, 'cancel') Booking.objects.update(cancellation_datetime=None) check_ignore_reason(event, None) Booking.objects.update(cancellation_datetime=now()) check_ignore_reason(event, 'cancel') Booking.objects.first().delete() check_ignore_reason(event, 'delete') Booking.objects.all().delete() check_ignore_reason(event, 'delete') def test_event_end_datetime(): agenda = Agenda.objects.create(label='Meetings', kind='meetings') meeting_type1 = MeetingType.objects.create(agenda=agenda, label='Foo', duration=60) meeting_type2 = MeetingType.objects.create(agenda=agenda, label='Foo', duration=45) event = Event.objects.create(start_datetime=now(), places=10, agenda=agenda) try: check_end_datetime(event, None) except ProgrammingError: pytest.skip('btree_Gist extension required') event.meeting_type = meeting_type1 event.save() check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=60)) event.meeting_type = meeting_type2 event.save() check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45)) event.start_datetime = now() event.save() check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45)) event.meeting_type = None event.save() check_end_datetime(event, None) event.meeting_type = meeting_type2 event.save() check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45)) event2 = Event.objects.create(start_datetime=now(), meeting_type=meeting_type1, places=10, agenda=agenda) check_end_datetime(event2, event2.start_datetime + datetime.timedelta(minutes=60)) meeting_type1.duration = 42 meeting_type1.save() check_end_datetime(event2, event2.start_datetime + datetime.timedelta(minutes=42)) check_end_datetime(event, event.start_datetime + datetime.timedelta(minutes=45)) def test_meeting_event_exclusion_constraint(): agenda = Agenda.objects.create(label='Meetings', kind='meetings') meeting_type1 = MeetingType.objects.create(agenda=agenda, label='Foo 1', duration=60) meeting_type2 = MeetingType.objects.create(agenda=agenda, label='Foo 2', duration=30) desk1 = Desk.objects.create(agenda=agenda, label='Desk 1') desk2 = Desk.objects.create(agenda=agenda, label='Desk 2') # create an event event1 = Event.objects.create( start_datetime=now(), meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1 ) try: check_ignore_reason(event1, None) except ProgrammingError: pytest.skip('btree_Gist extension required') # create a event with the same date range on other desk => no error Event.objects.create( start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk2 ) # no check if no meeting_type set Event.objects.create(start_datetime=event1.start_datetime, places=10, agenda=agenda, desk=desk1) # no check if no desk set Event.objects.create( start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda ) # create an event just after the first one => ok Event.objects.create( start_datetime=event1.start_datetime + datetime.timedelta(minutes=60), meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1, ) # try to create an event with the same date range and the same desk => error with pytest.raises(IntegrityError): with transaction.atomic(): Event.objects.create( start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1, ) # try to create an event with the same start date, other duration and the same desk => error with pytest.raises(IntegrityError): with transaction.atomic(): Event.objects.create( start_datetime=event1.start_datetime, meeting_type=meeting_type2, places=10, agenda=agenda, desk=desk1, ) # try to create an event overlaping the first one on the same desk => error with pytest.raises(IntegrityError): with transaction.atomic(): Event.objects.create( start_datetime=event1.start_datetime + datetime.timedelta(minutes=10), meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1, ) with pytest.raises(IntegrityError): with transaction.atomic(): Event.objects.create( start_datetime=event1.start_datetime + datetime.timedelta(minutes=10), meeting_type=meeting_type2, places=10, agenda=agenda, desk=desk1, ) with pytest.raises(IntegrityError): with transaction.atomic(): Event.objects.create( start_datetime=event1.start_datetime - datetime.timedelta(minutes=10), meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1, ) with pytest.raises(IntegrityError): with transaction.atomic(): Event.objects.create( start_datetime=event1.start_datetime - datetime.timedelta(minutes=10), meeting_type=meeting_type2, places=10, agenda=agenda, desk=desk1, ) # but if event1 is cancelled it's ok Booking.objects.create(event=event1, cancellation_datetime=now()) Event.objects.create( start_datetime=event1.start_datetime, meeting_type=meeting_type1, places=10, agenda=agenda, desk=desk1 ) def test_clean_time_period_exceptions(): app = 'agendas' migrate_from = [(app, '0065_unavailability_calendar')] migrate_to = [(app, '0066_timeperiodexceptionsource_unique_settings_slug')] executor = MigrationExecutor(connection) old_apps = executor.loader.project_state(migrate_from).apps executor.migrate(migrate_from) Agenda = old_apps.get_model(app, 'Agenda') Desk = old_apps.get_model(app, 'Desk') TimePeriodException = old_apps.get_model(app, 'TimePeriodException') TimePeriodExceptionSource = old_apps.get_model(app, 'TimePeriodExceptionSource') agenda = Agenda.objects.create(label='Agenda') desk = Desk.objects.create(label='Desk', slug='desk', agenda=agenda) # add normal time period exception to Desk source_desk = TimePeriodExceptionSource.objects.create(desk=desk, settings_slug='holidays', enabled=True) start_datetime = datetime.datetime(year=2020, month=1, day=2) end_datetime = datetime.datetime(year=2020, month=1, day=3) for _ in range(5): TimePeriodException.objects.create( desk=desk, source=source_desk, external=True, start_datetime=start_datetime, end_datetime=end_datetime, ) # now simulate broken state (desk duplication) new_desk = Desk.objects.create(label='New Desk', slug='new-desk', agenda=agenda) # normal source and exceptions source_new_desk = TimePeriodExceptionSource.objects.create( desk=new_desk, settings_slug='holidays', enabled=True ) for _ in range(5): TimePeriodException.objects.create( desk=new_desk, source=source_new_desk, external=True, start_datetime=start_datetime, end_datetime=end_datetime, ) # wrong duplicate of source TimePeriodExceptionSource.objects.create(desk=new_desk, settings_slug='holidays', enabled=True) # wrong duplicate of exceptions, referencing original desk source for _ in range(5): TimePeriodException.objects.create( desk=new_desk, source=source_desk, external=True, start_datetime=start_datetime, end_datetime=end_datetime, ) # extra data that should not be touched other_exception = TimePeriodException.objects.create( desk=desk, start_datetime=start_datetime, end_datetime=end_datetime, ) other_source = TimePeriodExceptionSource.objects.create(desk=desk, ics_file='test.ics') # even if wrong desk, this exception is not from settings thus should not get removed exception_from_ics = TimePeriodException.objects.create( desk=new_desk, start_datetime=start_datetime, end_datetime=end_datetime, source=other_source, ) # ensure migration fixes state executor = MigrationExecutor(connection) executor.migrate(migrate_to) executor.loader.build_graph() apps = executor.loader.project_state(migrate_to).apps Desk = apps.get_model(app, 'Desk') TimePeriodException = apps.get_model(app, 'TimePeriodException') TimePeriodExceptionSource = apps.get_model(app, 'TimePeriodExceptionSource') # original desk hasn't been touched desk = Desk.objects.get(pk=desk.pk) assert desk.timeperiodexception_set.filter(source__settings_slug='holidays').count() == 5 assert desk.timeperiodexceptionsource_set.filter(settings_slug='holidays').count() == 1 assert desk.timeperiodexception_set.filter(pk=other_exception.pk).exists() assert desk.timeperiodexceptionsource_set.filter(pk=other_source.pk).exists() # duplicated desk has correct exceptions new_desk = Desk.objects.get(pk=new_desk.pk) assert new_desk.timeperiodexception_set.filter(source__settings_slug='holidays').count() == 5 assert new_desk.timeperiodexceptionsource_set.filter(settings_slug='holidays').count() == 1 assert new_desk.timeperiodexception_set.filter(pk=exception_from_ics.pk).exists() exc = new_desk.timeperiodexception_set.filter(source__settings_slug='holidays').first() assert exc.source == new_desk.timeperiodexceptionsource_set.get(settings_slug='holidays') @override_settings(LANGUAGE_CODE='fr-fr') def test_translate_holidays_exceptions(): app = 'agendas' migrate_from = [(app, '0068_remove_timeperiodexception_external')] migrate_to = [(app, '0069_translate_holidays')] executor = MigrationExecutor(connection) old_apps = executor.loader.project_state(migrate_from).apps executor.migrate(migrate_from) Agenda = old_apps.get_model(app, 'Agenda') Desk = old_apps.get_model(app, 'Desk') TimePeriodException = old_apps.get_model(app, 'TimePeriodException') TimePeriodExceptionSource = old_apps.get_model(app, 'TimePeriodExceptionSource') agenda = Agenda.objects.create(label='Agenda') desk = Desk.objects.create(label='Desk', slug='desk', agenda=agenda) source_desk = TimePeriodExceptionSource.objects.create(desk=desk, settings_slug='holidays') start_datetime = datetime.datetime(year=2020, month=1, day=2) end_datetime = datetime.datetime(year=2020, month=1, day=3) TimePeriodException.objects.create( desk=desk, source=source_desk, label='All Saints Day', start_datetime=start_datetime, end_datetime=end_datetime, ) TimePeriodException.objects.create( desk=desk, source=source_desk, label='New year', start_datetime=start_datetime, end_datetime=end_datetime, ) executor = MigrationExecutor(connection) executor.migrate(migrate_to) executor.loader.build_graph() apps = executor.loader.project_state(migrate_to).apps Desk = apps.get_model(app, 'Desk') desk = Desk.objects.get(slug='desk') assert not desk.timeperiodexception_set.filter(label='All Saints Day').exists() assert not desk.timeperiodexception_set.filter(label='New year').exists() assert desk.timeperiodexception_set.filter(label='Toussaint').count() == 1 assert desk.timeperiodexception_set.filter(label='Jour de l’An').count() == 1 def test_migration_convert_week_days(): app = 'agendas' migrate_from = [(app, '0156_update_dow_index')] migrate_to = [(app, '0157_convert_week_days')] executor = MigrationExecutor(connection) old_apps = executor.loader.project_state(migrate_from).apps executor.migrate(migrate_from) Agenda = old_apps.get_model(app, 'Agenda') Event = old_apps.get_model(app, 'Event') SharedCustodyRule = old_apps.get_model(app, 'SharedCustodyRule') SharedCustodyAgenda = old_apps.get_model(app, 'SharedCustodyAgenda') Person = old_apps.get_model(app, 'Person') agenda = Agenda.objects.create(label='Foo', kind='events') Event.objects.create(recurrence_days=None, start_datetime=now(), places=1, agenda=agenda, slug='none') Event.objects.create(recurrence_days=[], start_datetime=now(), places=1, agenda=agenda, slug='empty') Event.objects.create(recurrence_days=[3], start_datetime=now(), places=1, agenda=agenda, slug='[3]') Event.objects.create(recurrence_days=[0, 6], start_datetime=now(), places=1, agenda=agenda, slug='[0, 6]') Event.objects.create( recurrence_days=[0, 1, 2, 3, 4, 5, 6], start_datetime=now(), places=1, agenda=agenda, slug='all' ) father = Person.objects.create(user_external_id='father_id', first_name='John', last_name='Doe') mother = Person.objects.create(user_external_id='mother_id', first_name='Jane', last_name='Doe') child = Person.objects.create(user_external_id='xxx', first_name='James', last_name='Doe') agenda = SharedCustodyAgenda.objects.create( first_guardian=father, second_guardian=mother, child=child, date_start=now() ) SharedCustodyRule.objects.create(days=[0, 4, 6], weeks='', guardian=father, agenda=agenda) executor = MigrationExecutor(connection) executor.migrate(migrate_to) executor.loader.build_graph() apps = executor.loader.project_state(migrate_to).apps Event = apps.get_model(app, 'Event') assert Event.objects.get(slug='none').recurrence_days is None assert Event.objects.get(slug='empty').recurrence_days == [] assert Event.objects.get(slug='[3]').recurrence_days == [4] assert Event.objects.get(slug='[0, 6]').recurrence_days == [1, 7] assert Event.objects.get(slug='all').recurrence_days == [1, 2, 3, 4, 5, 6, 7] assert SharedCustodyRule.objects.get().days == [1, 5, 7] def test_migration_booking_check_data(): app = 'agendas' migrate_from = [(app, '0161_add_booking_check_model')] migrate_to = [(app, '0162_migrate_booking_check_data')] executor = MigrationExecutor(connection) old_apps = executor.loader.project_state(migrate_from).apps executor.migrate(migrate_from) Agenda = old_apps.get_model(app, 'Agenda') Event = old_apps.get_model(app, 'Event') Booking = old_apps.get_model(app, 'Booking') agenda = Agenda.objects.create(label='Foo', kind='events') event = Event.objects.create(start_datetime=now(), places=10, agenda=agenda, slug='event') not_checked = Booking.objects.create(event=event) present = Booking.objects.create(event=event, user_was_present=True) absent = Booking.objects.create(event=event, user_was_present=False) with_check_type = Booking.objects.create( event=event, user_was_present=False, user_check_type_slug='xxx', user_check_type_label='XXX', user_check_start_time=datetime.time(12, 0), user_check_end_time=datetime.time(14, 0), computed_start_time=datetime.time(12, 30), computed_end_time=datetime.time(14, 30), ) executor = MigrationExecutor(connection) executor.migrate(migrate_to) executor.loader.build_graph() apps = executor.loader.project_state(migrate_to).apps Booking = apps.get_model(app, 'Booking') assert not hasattr(Booking.objects.get(pk=not_checked.pk), 'user_check') assert Booking.objects.get(pk=present.pk).user_check.presence is True assert Booking.objects.get(pk=absent.pk).user_check.presence is False with_check_type = Booking.objects.get(pk=with_check_type.pk) assert with_check_type.user_check.presence is False assert with_check_type.user_check.type_slug == 'xxx' assert with_check_type.user_check.type_label == 'XXX' assert with_check_type.user_check.start_time == datetime.time(12, 0) assert with_check_type.user_check.end_time == datetime.time(14, 0) assert with_check_type.computed_start_time == datetime.time(12, 30) assert with_check_type.computed_end_time == datetime.time(14, 30)