import collections import datetime from django.contrib.auth.models import Group from django.db import models, transaction from django.db.models import ExpressionWrapper, F from django.utils.translation import gettext_lazy as _ from rest_framework import serializers from rest_framework.exceptions import ValidationError from chrono.agendas.models import ( Agenda, Booking, BookingColor, Category, Event, EventsType, Person, SharedCustodyAgenda, SharedCustodyHolidayRule, SharedCustodyRule, Subscription, TimePeriodExceptionGroup, ) from chrono.utils.lingo import get_agenda_check_types def get_objects_from_slugs(slugs, qs): slugs = set(slugs) objects = qs.filter(slug__in=slugs) if len(objects) != len(slugs): unknown_slugs = sorted(slugs - {obj.slug for obj in objects}) unknown_slugs = ', '.join(unknown_slugs) raise ValidationError(('invalid slugs: %s') % unknown_slugs) return objects class StringOrListField(serializers.ListField): def to_internal_value(self, data): if isinstance(data, str): data = [s.strip() for s in data.split(',') if s.strip()] return super().to_internal_value(data) class CommaSeparatedStringField(serializers.ListField): def get_value(self, dictionary): return super(serializers.ListField, self).get_value(dictionary) def to_internal_value(self, data): data = [s.strip() for s in data.split(',') if s.strip()] return super().to_internal_value(data) class DateRangeMixin(metaclass=serializers.SerializerMetaclass): datetime_formats = ['%Y-%m-%d', '%Y-%m-%d %H:%M', 'iso-8601'] date_start = serializers.DateTimeField(required=False, input_formats=datetime_formats) date_end = serializers.DateTimeField(required=False, input_formats=datetime_formats) class AgendaSlugsMixin(metaclass=serializers.SerializerMetaclass): agendas = CommaSeparatedStringField( required=False, child=serializers.SlugField(max_length=160, allow_blank=False) ) def get_agenda_qs(self): return Agenda.objects.filter(kind='events').select_related('events_type') class FillSlotSerializer(serializers.Serializer): label = serializers.CharField(max_length=250, allow_blank=True) user_external_id = serializers.CharField(max_length=250, allow_blank=True) user_name = serializers.CharField(max_length=250, allow_blank=True) # compatibility user_first_name = serializers.CharField(max_length=250, allow_blank=True) user_last_name = serializers.CharField(max_length=250, allow_blank=True) user_display_label = serializers.CharField(max_length=250, allow_blank=True) user_email = serializers.CharField(max_length=250, allow_blank=True) user_phone_number = serializers.CharField(max_length=16, allow_blank=True) exclude_user = serializers.BooleanField(default=False) events = serializers.CharField(max_length=16, allow_blank=True) bypass_delays = serializers.BooleanField(default=False) form_url = serializers.CharField(max_length=250, allow_blank=True) backoffice_url = serializers.URLField(allow_blank=True) cancel_callback_url = serializers.URLField(allow_blank=True) count = serializers.IntegerField(min_value=1) cancel_booking_id = serializers.CharField(max_length=250, allow_blank=True, allow_null=True) force_waiting_list = serializers.BooleanField(default=False) use_color_for = serializers.CharField(max_length=250, allow_blank=True) extra_emails = StringOrListField( required=False, child=serializers.EmailField(max_length=250, allow_blank=False) ) extra_phone_numbers = StringOrListField( required=False, child=serializers.CharField(max_length=16, allow_blank=False) ) check_overlaps = serializers.BooleanField(default=False) class SlotsSerializer(serializers.Serializer): slots = StringOrListField(required=True, child=serializers.CharField(max_length=160, allow_blank=False)) def validate(self, attrs): super().validate(attrs) if not attrs.get('slots'): raise serializers.ValidationError({'slots': _('This field is required.')}) return attrs class FillSlotsSerializer(FillSlotSerializer, SlotsSerializer): pass class EventsFillSlotsSerializer(FillSlotSerializer): slots = StringOrListField(required=True, child=serializers.CharField(max_length=160)) def validate(self, attrs): super().validate(attrs) if 'slots' not in attrs: raise serializers.ValidationError({'slots': _('This field is required.')}) if not attrs.get('user_external_id'): raise serializers.ValidationError({'user_external_id': _('This field is required.')}) if 'exclude_user' in attrs: raise serializers.ValidationError({'exclude_user': _('This parameter is not supported.')}) return attrs class MultipleAgendasEventsFillSlotsSerializer(EventsFillSlotsSerializer): def validate_slots(self, value): allowed_agenda_slugs = self.context['allowed_agenda_slugs'] slots_agenda_slugs = set() for slot in value: try: agenda_slug, event_slug = slot.split('@') except ValueError: raise ValidationError(_('Invalid format for slot %s') % slot) if not agenda_slug: raise ValidationError(_('Missing agenda slug in slot %s') % slot) if not event_slug: raise ValidationError(_('Missing event slug in slot %s') % slot) slots_agenda_slugs.add(agenda_slug) extra_agendas = slots_agenda_slugs - set(allowed_agenda_slugs) if extra_agendas: extra_agendas = ', '.join(sorted(extra_agendas)) raise ValidationError(_('Events from the following agendas cannot be booked: %s') % extra_agendas) return value class MultipleAgendasEventsCheckStatusSerializer(AgendaSlugsMixin, DateRangeMixin, serializers.Serializer): user_external_id = serializers.CharField(required=False, max_length=250, allow_blank=False) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) for field in ['agendas', 'user_external_id', 'date_start', 'date_end']: self.fields[field].required = True def validate_agendas(self, value): return get_objects_from_slugs(value, qs=self.get_agenda_qs()) class RecurringFillslotsSerializer(MultipleAgendasEventsFillSlotsSerializer): include_booked_events_detail = serializers.BooleanField(default=False) def validate_slots(self, value): super().validate_slots(value) self.initial_slots = value open_event_slugs = collections.defaultdict(set) for agenda in self.context['agendas']: for event in agenda.get_open_recurring_events(): open_event_slugs[agenda.slug].add(event.slug) slots = collections.defaultdict(lambda: collections.defaultdict(list)) for slot in value: try: slugs, day = slot.split(':') day = int(day) except ValueError: raise ValidationError(_('invalid slot: %s') % slot) agenda_slug, event_slug = slugs.split('@') if event_slug not in open_event_slugs[agenda_slug]: raise ValidationError( _('event %(event_slug)s of agenda %(agenda_slug)s is not bookable') % {'event_slug': event_slug, 'agenda_slug': agenda_slug} ) # convert ISO day number to db lookup day number day = (day + 1) % 7 + 1 slots[agenda_slug][event_slug].append(day) return slots class BookingSerializer(serializers.ModelSerializer): user_absence_reason = serializers.CharField(required=False, allow_blank=True, allow_null=True) user_presence_reason = serializers.CharField(required=False, allow_blank=True, allow_null=True) use_color_for = serializers.CharField(required=False, allow_blank=True, allow_null=True, source='color') class Meta: model = Booking fields = [ 'id', 'in_waiting_list', 'user_first_name', 'user_last_name', 'user_email', 'user_phone_number', 'user_was_present', 'user_absence_reason', 'user_presence_reason', 'use_color_for', 'extra_data', 'creation_datetime', 'cancellation_datetime', 'label', ] read_only_fields = [ 'id', 'in_waiting_list', 'extra_data', 'creation_datetime', 'cancellation_datetime', ] def to_internal_value(self, data): if 'color' in data: # legacy data['use_color_for'] = data['color'] del data['color'] return super().to_internal_value(data) def to_representation(self, instance): ret = super().to_representation(instance) if self.instance.event.agenda.kind != 'events': ret['desk'] = { 'slug': self.instance.event.desk.slug, 'label': self.instance.event.desk.label, } ret.pop('user_was_present', None) ret.pop('user_absence_reason', None) ret.pop('user_presence_reason', None) else: ret['user_absence_reason'] = ( self.instance.user_check_type_slug if self.instance.user_was_present is False else None ) ret['user_presence_reason'] = ( self.instance.user_check_type_slug if self.instance.user_was_present is True else None ) return ret def _validate_check_type(self, kind, value): if not value: return None error_messages = { 'absence': _('unknown absence reason'), 'presence': _('unknown presence reason'), } check_types = get_agenda_check_types(self.instance.event.agenda) for check_type in check_types: if check_type.kind != kind: continue if value in [check_type.slug, check_type.label]: return check_type raise serializers.ValidationError(error_messages[kind]) def validate_user_absence_reason(self, value): return self._validate_check_type('absence', value) def validate_user_presence_reason(self, value): return self._validate_check_type('presence', value) def validate_use_color_for(self, value): if value: return BookingColor.objects.get_or_create(label=value)[0] def validate(self, attrs): super().validate(attrs) if 'user_absence_reason' in attrs and 'user_presence_reason' in attrs: raise ValidationError( {'user_absence_reason': _('can not set user_absence_reason and user_presence_reason')} ) if 'user_absence_reason' in attrs: check_type = attrs['user_absence_reason'] attrs['user_check_type_slug'] = check_type.slug if check_type else None attrs['user_check_type_label'] = check_type.label if check_type else None del attrs['user_absence_reason'] elif 'user_presence_reason' in attrs: check_type = attrs['user_presence_reason'] attrs['user_check_type_slug'] = check_type.slug if check_type else None attrs['user_check_type_label'] = check_type.label if check_type else None del attrs['user_presence_reason'] return attrs class ResizeSerializer(serializers.Serializer): count = serializers.IntegerField(min_value=1) class StatisticsFiltersSerializer(serializers.Serializer): time_interval = serializers.ChoiceField(choices=('day', _('Day')), default='day') start = serializers.DateTimeField(required=False, input_formats=['iso-8601', '%Y-%m-%d']) end = serializers.DateTimeField(required=False, input_formats=['iso-8601', '%Y-%m-%d']) category = serializers.SlugField(required=False, allow_blank=False, max_length=256) agenda = serializers.CharField(required=False, allow_blank=False, max_length=256) group_by = serializers.ListField( required=False, child=serializers.SlugField(allow_blank=False, max_length=256) ) class DateRangeSerializer(DateRangeMixin, serializers.Serializer): pass class DatetimesSerializer(DateRangeSerializer): min_places = serializers.IntegerField(min_value=1, default=1) user_external_id = serializers.CharField(required=False, max_length=250, allow_blank=True) exclude_user_external_id = serializers.CharField(required=False, max_length=250, allow_blank=True) events = serializers.CharField(required=False, max_length=32, allow_blank=True) hide_disabled = serializers.BooleanField(default=False) bypass_delays = serializers.BooleanField(default=False) def validate(self, attrs): super().validate(attrs) if ( 'user_external_id' in attrs and 'exclude_user_external_id' in attrs and attrs['user_external_id'] != attrs['exclude_user_external_id'] ): raise ValidationError( {'user_external_id': _('user_external_id and exclude_user_external_id have different values')} ) return attrs class AgendaOrSubscribedSlugsMixin(AgendaSlugsMixin): subscribed = CommaSeparatedStringField( required=False, child=serializers.SlugField(max_length=160, allow_blank=False) ) user_external_id = serializers.CharField(required=False, max_length=250, allow_blank=False) guardian_external_id = serializers.CharField(required=False, max_length=250, allow_blank=True) def validate(self, attrs): super().validate(attrs) if 'agendas' not in attrs and 'subscribed' not in attrs: raise ValidationError(_('Either "agendas" or "subscribed" parameter is required.')) if 'agendas' in attrs and 'subscribed' in attrs: raise ValidationError(_('"agendas" and "subscribed" parameters are mutually exclusive.')) user_external_id = attrs.get('user_external_id', self.context.get('user_external_id')) if 'subscribed' in attrs and not user_external_id: raise ValidationError( {'user_external_id': _('This field is required when using "subscribed" parameter.')} ) if attrs.get('guardian_external_id') and not user_external_id: raise serializers.ValidationError( {'user_external_id': _('This field is required when using "guardian_external_id" parameter.')} ) if 'subscribed' in attrs: lookups = {'subscriptions__user_external_id': user_external_id} if 'date_start' in attrs: # subscription must end after requested date_start lookups['subscriptions__date_end__gt'] = attrs['date_start'] if 'date_end' in attrs: # subscription must start before requested date end lookups['subscriptions__date_start__lte'] = attrs['date_end'] agendas = self.get_agenda_qs().filter(**lookups).distinct().select_related('category') if attrs['subscribed'] != ['all']: agendas = agendas.filter(category__slug__in=attrs['subscribed']) attrs['agendas'] = agendas attrs['agenda_slugs'] = [agenda.slug for agenda in agendas] else: attrs['agenda_slugs'] = self.agenda_slugs return attrs def validate_agendas(self, value): self.agenda_slugs = value return get_objects_from_slugs(value, qs=self.get_agenda_qs()) class MultipleAgendasDatetimesSerializer(AgendaOrSubscribedSlugsMixin, DatetimesSerializer): show_past_events = serializers.BooleanField(default=False) with_status = serializers.BooleanField(default=False) check_overlaps = serializers.BooleanField(default=False) def validate(self, attrs): super().validate(attrs) user_external_id = attrs.get('user_external_id') if attrs.get('with_status') and not user_external_id: raise ValidationError( {'user_external_id': _('This field is required when using "with_status" parameter.')} ) return attrs class AgendaOrSubscribedSlugsSerializer(AgendaOrSubscribedSlugsMixin, DateRangeMixin, serializers.Serializer): pass class RecurringFillslotsQueryStringSerializer(AgendaOrSubscribedSlugsSerializer): action = serializers.ChoiceField(required=True, choices=['update', 'book', 'unbook']) class RecurringEventsListSerializer(AgendaOrSubscribedSlugsSerializer): sort = serializers.ChoiceField(required=False, choices=['day']) check_overlaps = serializers.BooleanField(default=False) class EventSerializer(serializers.ModelSerializer): recurrence_days = StringOrListField( required=False, child=serializers.IntegerField(min_value=0, max_value=6) ) primary_event = serializers.SlugRelatedField(read_only=True, slug_field='slug') agenda = serializers.SlugRelatedField(read_only=True, slug_field='slug') class Meta: model = Event fields = [ 'start_datetime', 'recurrence_days', 'recurrence_week_interval', 'recurrence_end_date', 'duration', 'publication_datetime', 'places', 'waiting_list_places', 'label', 'slug', 'description', 'pricing', 'url', 'primary_event', 'agenda', ] read_only_fields = ['slug'] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not self.instance.agenda.events_type: return field_classes = { 'text': serializers.CharField, 'textarea': serializers.CharField, 'bool': serializers.NullBooleanField, } field_options = { 'text': {'allow_blank': True}, 'textarea': {'allow_blank': True}, } for custom_field in self.instance.agenda.events_type.get_custom_fields(): field_class = field_classes[custom_field['field_type']] field_name = 'custom_field_%s' % custom_field['varname'] self.fields[field_name] = field_class( required=False, read_only=self.instance.primary_event is not None, **(field_options.get(custom_field['field_type']) or {}), ) def validate(self, attrs): if not self.instance.agenda.events_type: return attrs if self.instance.primary_event: return attrs defaults = { 'text': '', 'textarea': '', 'bool': None, } custom_fields = self.instance.custom_fields for custom_field in self.instance.agenda.events_type.get_custom_fields(): varname = custom_field['varname'] field_name = 'custom_field_%s' % varname if varname not in custom_fields: # set default custom_fields[varname] = defaults[custom_field['field_type']] if field_name in attrs: custom_fields[varname] = attrs[field_name] del attrs[field_name] attrs['custom_fields'] = custom_fields return attrs def to_representation(self, instance): ret = super().to_representation(instance) if not self.instance.agenda.events_type: return ret defaults = { 'text': '', 'textarea': '', 'bool': None, } custom_fields = self.instance.custom_fields for custom_field in self.instance.agenda.events_type.get_custom_fields(): varname = custom_field['varname'] field_name = 'custom_field_%s' % varname value = defaults[custom_field['field_type']] if varname in custom_fields: value = custom_fields[varname] ret[field_name] = value return ret class AgendaSerializer(serializers.ModelSerializer): edit_role = serializers.CharField(required=False, max_length=150) view_role = serializers.CharField(required=False, max_length=150) category = serializers.SlugField(required=False, max_length=160) events_type = serializers.SlugField(required=False, max_length=160) class Meta: model = Agenda fields = [ 'slug', 'label', 'kind', 'minimal_booking_delay', 'minimal_booking_delay_in_working_days', 'maximal_booking_delay', 'anonymize_delay', 'edit_role', 'view_role', 'category', 'events_type', ] def get_role(self, value): try: return Group.objects.get(name=value) except Group.DoesNotExist: raise serializers.ValidationError(_('unknown role: %s' % value)) def validate_edit_role(self, value): return self.get_role(value) def validate_view_role(self, value): return self.get_role(value) def validate_category(self, value): try: return Category.objects.get(slug=value) except Category.DoesNotExist: raise serializers.ValidationError(_('unknown category: %s' % value)) def validate_events_type(self, value): try: return EventsType.objects.get(slug=value) except EventsType.DoesNotExist: raise serializers.ValidationError(_('unknown events type: %s' % value)) def validate(self, attrs): super().validate(attrs) if attrs.get('minimal_booking_delay_in_working_days') and attrs.get('kind', 'events') != 'events': raise ValidationError( { 'minimal_booking_delay_in_working_days': _('Option not available on %s agenda') % attrs['kind'] } ) if attrs.get('events_type') and attrs.get('kind', 'events') != 'events': raise ValidationError({'events_type': _('Option not available on %s agenda') % attrs['kind']}) return attrs class SubscriptionSerializer(serializers.ModelSerializer): class Meta: model = Subscription fields = [ 'id', 'user_external_id', 'user_first_name', 'user_last_name', 'user_email', 'user_phone_number', 'date_start', 'date_end', 'extra_data', ] read_only_fields = ['id', 'extra_data'] def validate(self, attrs): super().validate(attrs) if attrs.get('date_start') and attrs.get('date_end') and attrs['date_start'] > attrs['date_end']: raise ValidationError(_('start_datetime must be before end_datetime')) return attrs class SharedCustodyAgendaCreateSerializer(serializers.Serializer): period_mirrors = { 'even': 'odd', 'odd': 'even', 'first-half': 'second-half', 'second-half': 'first-half', 'first-and-third-quarters': 'second-and-fourth-quarters', 'second-and-fourth-quarters': 'first-and-third-quarters', } guardian_first_name = serializers.CharField(max_length=250) guardian_last_name = serializers.CharField(max_length=250) guardian_id = serializers.CharField(max_length=250) other_guardian_first_name = serializers.CharField(max_length=250) other_guardian_last_name = serializers.CharField(max_length=250) other_guardian_id = serializers.CharField(max_length=250) child_first_name = serializers.CharField(max_length=250) child_last_name = serializers.CharField(max_length=250) child_id = serializers.CharField(max_length=250) weeks = serializers.ChoiceField(required=False, choices=['', 'even', 'odd']) date_start = serializers.DateField(required=True) settings_url = serializers.SerializerMethodField() def validate(self, attrs): attrs['holidays'] = collections.defaultdict(dict) for key, value in self.initial_data.items(): if key in attrs or ':' not in key: continue holiday_slug, field = key.split(':') if field not in ('periodicity', 'years'): raise ValidationError( _('Unknown parameter for holiday %(holiday)s: %(param)s') % {'holiday': holiday_slug, 'param': field} ) attrs['holidays'][holiday_slug][field] = value for holiday_slug in attrs['holidays'].copy(): try: holiday = TimePeriodExceptionGroup.objects.get(slug=holiday_slug) except TimePeriodExceptionGroup.DoesNotExist: raise ValidationError(_('Unknown holiday: %s') % holiday_slug) field_values = attrs['holidays'].pop(holiday_slug) if 'periodicity' not in field_values: raise ValidationError(_('Missing periodicity for holiday: %s') % holiday_slug) holidays = holiday.exceptions.annotate( delta=ExpressionWrapper( F('end_datetime') - F('start_datetime'), output_field=models.DurationField() ) ) is_short_holiday = holidays.filter(delta__lt=datetime.timedelta(days=28)).exists() if 'quarter' in field_values['periodicity'] and is_short_holiday: raise ValidationError(_('Short holidays cannot be cut into quarters.')) attrs['holidays'][holiday] = field_values return attrs @transaction.atomic def create(self, validated_data): guardian, dummy = Person.objects.get_or_create( user_external_id=validated_data['guardian_id'], defaults={ 'first_name': validated_data['guardian_first_name'], 'last_name': validated_data['guardian_last_name'], }, ) other_guardian, dummy = Person.objects.get_or_create( user_external_id=validated_data['other_guardian_id'], defaults={ 'first_name': validated_data['other_guardian_first_name'], 'last_name': validated_data['other_guardian_last_name'], }, ) child, dummy = Person.objects.get_or_create( user_external_id=validated_data['child_id'], defaults={ 'first_name': validated_data['child_first_name'], 'last_name': validated_data['child_last_name'], }, ) self.agenda = SharedCustodyAgenda.objects.create( first_guardian=guardian, second_guardian=other_guardian, child=child, date_start=validated_data['date_start'], ) if validated_data.get('weeks'): self.create_custody_rules(guardian, validated_data['weeks'], create_mirror_for=other_guardian) for holiday, params in validated_data.get('holidays', {}).items(): self.create_holiday_rules( holiday, guardian, create_mirror_for=other_guardian, periodicity=params['periodicity'], years=params.get('years', ''), ) return self.agenda def create_custody_rules(self, guardian, weeks, create_mirror_for=None): SharedCustodyRule.objects.create( agenda=self.agenda, days=list(range(7)), weeks=weeks, guardian=guardian ) if create_mirror_for: self.create_custody_rules(create_mirror_for, self.period_mirrors[weeks]) def create_holiday_rules(self, holiday, guardian, years, periodicity, create_mirror_for=None): rule = SharedCustodyHolidayRule.objects.create( agenda=self.agenda, holiday=holiday, guardian=guardian, years=years, periodicity=periodicity ) rule.update_or_create_periods() if years: rule = SharedCustodyHolidayRule.objects.create( agenda=self.agenda, holiday=holiday, guardian=guardian, years=self.period_mirrors[years], periodicity=self.period_mirrors[periodicity], ) rule.update_or_create_periods() if create_mirror_for: self.create_holiday_rules(holiday, create_mirror_for, years, self.period_mirrors[periodicity]) def get_settings_url(self, obj): request = self.context.get('request') return request.build_absolute_uri(obj.get_settings_url()) class SharedCustodyAgendaSerializer(serializers.ModelSerializer): class Meta: model = SharedCustodyAgenda fields = ['date_start']