lingo/lingo/api/serializers.py

298 lines
13 KiB
Python

# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
from django.utils.translation import ugettext_lazy as _
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from lingo.agendas.chrono import ChronoError, get_events, get_subscriptions
from lingo.agendas.models import Agenda
from lingo.pricing.models import AgendaPricing, AgendaPricingNotFound, PricingError
class CommaSeparatedStringField(serializers.ListField):
def get_value(self, dictionary):
return super(serializers.ListField, self).get_value(dictionary)
def to_internal_value(self, data):
data = [s.strip() for s in data.split(',') if s.strip()]
return super().to_internal_value(data)
class PricingComputeSerializer(serializers.Serializer):
slots = CommaSeparatedStringField(
required=False, child=serializers.CharField(max_length=160, allow_blank=False)
)
agenda = serializers.SlugField(required=False, allow_blank=False, max_length=160)
agenda_pricing = serializers.SlugField(required=False, allow_blank=False, max_length=160)
start_date = serializers.DateTimeField(required=False, input_formats=['iso-8601', '%Y-%m-%d'])
user_external_id = serializers.CharField(required=True, max_length=250)
adult_external_id = serializers.CharField(required=True, max_length=250)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._agenda_slugs = []
self._agendas = {}
self._serialized_events = {}
self._event_subscriptions = {}
self._agenda = None
self._agenda_subscription = None
self._agenda_pricing = None
self._billing_date = None
def validate_slots(self, value):
self._agendas = {a.slug: a for a in Agenda.objects.all()}
allowed_agenda_slugs = self._agendas.keys()
agenda_slugs = set()
for slot in value:
try:
agenda_slug, event_slug = slot.split('@')
except ValueError:
raise ValidationError(_('Invalid format for slot %s') % slot)
if not agenda_slug:
raise ValidationError(_('Missing agenda slug in slot %s') % slot)
if not event_slug:
raise ValidationError(_('Missing event slug in slot %s') % slot)
agenda_slugs.add(agenda_slug)
extra_agendas = agenda_slugs - set(allowed_agenda_slugs)
if extra_agendas:
extra_agendas = ', '.join(sorted(extra_agendas))
raise ValidationError(_('Unknown agendas: %s') % extra_agendas)
self._agenda_slugs = sorted(agenda_slugs)
try:
serialized_events = get_events(value)
except ChronoError as e:
raise ValidationError(e)
else:
for serialized_event in serialized_events:
event_slug = '%s@%s' % (serialized_event['agenda'], serialized_event['slug'])
self._serialized_events[event_slug] = serialized_event
return value
def _validate_agenda(self, value, start_date):
try:
self._agenda = Agenda.objects.get(slug=value)
try:
self._agenda_pricing = AgendaPricing.get_agenda_pricing(
agenda=self._agenda,
start_date=start_date.date(),
flat_fee_schedule=True,
)
if not self._agenda_pricing.subscription_required:
self._agenda_pricing = None
except AgendaPricingNotFound:
self._agenda_pricing = None
except Agenda.DoesNotExist:
raise ValidationError({'agenda': _('Unknown agenda: %s') % value})
return self._agenda
def _validate_agenda_pricing(self, value, start_date):
try:
self._agenda_pricing = AgendaPricing.objects.get(
slug=value,
flat_fee_schedule=True,
subscription_required=False,
date_start__lte=start_date.date(),
date_end__gt=start_date.date(),
)
except AgendaPricing.DoesNotExist:
raise ValidationError({'agenda_pricing': _('Unknown pricing: %s') % value})
return self._agenda_pricing
def validate(self, attrs):
super().validate(attrs)
if 'slots' not in attrs and 'agenda' not in attrs and 'agenda_pricing' not in attrs:
raise ValidationError(_('Either "slots", "agenda" or "agenda_pricing" parameter is required.'))
if 'agenda' in attrs:
# flat_fee_schedule mode + subscription_required True
if 'start_date' not in attrs:
raise ValidationError(
{'start_date': _('This field is required when using "agenda" parameter.')}
)
self._validate_agenda(attrs['agenda'], attrs['start_date'])
if 'agenda_pricing' in attrs:
# flat_fee_schedule mode + subscription_required False
if 'start_date' not in attrs:
raise ValidationError(
{'start_date': _('This field is required when using "agenda_pricing" parameter.')}
)
self._validate_agenda_pricing(attrs['agenda_pricing'], attrs['start_date'])
if attrs.get('start_date'):
# flat_fee_schedule mode: get billing_date from start_date param
self.get_billing_date(attrs['start_date'])
if attrs.get('user_external_id') and not attrs.get('agenda_pricing'):
# don't check subscriptions if agenda_pricing in params (== subscription_required is False)
self.get_subscriptions(attrs['user_external_id'], attrs.get('start_date'))
return attrs
def get_billing_date(self, start_date):
if self._agenda_pricing:
self._billing_date = (
self._agenda_pricing.billingdates.filter(date_start__lte=start_date)
.order_by('date_start')
.last()
)
if not self._billing_date:
self._billing_date = self._agenda_pricing.billingdates.order_by('date_start').first()
def get_subscriptions(self, user_external_id, start_date):
if self._serialized_events:
# event mode
self.get_subscriptions_for_events(user_external_id)
elif self._agenda and self._agenda_pricing:
# flat_fee_schedule mode
self.get_subscriptions_for_agenda(user_external_id, start_date)
def _get_subscription(self, subscriptions, start_date, end_date):
# get subscription matching start_date & end_date
for subscription in subscriptions:
sub_start_date = datetime.date.fromisoformat(subscription['date_start'])
sub_end_date = datetime.date.fromisoformat(subscription['date_end'])
if sub_start_date >= end_date:
continue
if sub_end_date <= start_date:
continue
return subscription
def get_subscriptions_for_events(self, user_external_id):
agenda_subscriptions = {}
# get subscription list for each agenda
for agenda_slug in self._agenda_slugs:
try:
agenda_subscriptions[agenda_slug] = get_subscriptions(agenda_slug, user_external_id)
except ChronoError as e:
raise ValidationError({'user_external_id': e})
# for each event, get a matching subscription
for event_slug, serialized_event in self._serialized_events.items():
start_date = datetime.datetime.fromisoformat(serialized_event['start_datetime']).date()
end_date = start_date + datetime.timedelta(days=1)
agenda_slug = serialized_event['agenda']
event_subscription = self._get_subscription(
agenda_subscriptions[agenda_slug], start_date, end_date
)
if event_subscription is None:
raise ValidationError(
{'user_external_id': _('No subscription found for event %s') % event_slug}
)
self._event_subscriptions[event_slug] = event_subscription
def get_subscriptions_for_agenda(self, user_external_id, start_date):
# get subscription list for this agenda
try:
subscriptions = get_subscriptions(self._agenda.slug, user_external_id)
except ChronoError as e:
raise ValidationError({'user_external_id': e})
# get correct period, from billing_date or agenda_pricing dates
if not self._agenda_pricing.billingdates.exists():
start_date = self._agenda_pricing.date_start
end_date = self._agenda_pricing.date_end
else:
start_date = self._billing_date.date_start
next_billing_date = self._agenda_pricing.billingdates.filter(date_start__gt=start_date).first()
end_date = next_billing_date.date_start if next_billing_date else self._agenda_pricing.date_end
# get a matching subscriptions
self._agenda_subscription = self._get_subscription(subscriptions, start_date, end_date)
if not self._agenda_subscription:
raise ValidationError(
{'user_external_id': _('No subscription found for agenda %s') % self._agenda.slug}
)
def compute(self, request):
try:
if not self.validated_data.get('slots'):
return self.compute_for_flat_fee_schedule(request)
return self.compute_for_event(request)
except PricingError as e:
return {
'error': type(e),
'error_details': e.details,
}
def compute_for_event(self, request):
result = []
event_slugs = sorted(self._serialized_events.keys())
for event_slug in event_slugs:
serialized_event = self._serialized_events[event_slug]
start_date = datetime.datetime.fromisoformat(serialized_event['start_datetime']).date()
agenda = self._agendas[serialized_event['agenda']]
try:
agenda_pricing = AgendaPricing.get_agenda_pricing(
agenda=agenda, start_date=start_date, flat_fee_schedule=False
)
pricing_data = agenda_pricing.get_pricing_data_for_event(
request=request,
agenda=agenda,
event=serialized_event,
subscription=self._event_subscriptions[event_slug],
check_status={
'status': 'presence',
'check_type': None,
},
booking={},
user_external_id=self.validated_data['user_external_id'],
adult_external_id=self.validated_data['adult_external_id'],
)
result.append(
{
'event': event_slug,
'pricing_data': pricing_data,
}
)
except AgendaPricingNotFound:
result.append(
{'event': event_slug, 'error': _('No agenda pricing found for event %s') % event_slug}
)
except PricingError as e:
result.append({'event': event_slug, 'error': e.details})
result = sorted(result, key=lambda d: d['event'])
return result
def compute_for_flat_fee_schedule(self, request):
result = {}
if self._agenda:
result['agenda'] = self._agenda.slug
if not self._agenda_pricing:
result['error'] = _('No agenda pricing found for agenda %s') % self._agenda.slug
return result
else:
result['agenda_pricing'] = self._agenda_pricing.slug
try:
pricing_data = self._agenda_pricing.get_pricing_data(
request=request,
pricing_date=(
self._billing_date.date_start if self._billing_date else self._agenda_pricing.date_start
),
subscription=self._agenda_subscription,
user_external_id=self.validated_data['user_external_id'],
adult_external_id=self.validated_data['adult_external_id'],
)
result['pricing_data'] = pricing_data
return result
except PricingError as e:
result['error'] = type(e).__name__
result['error_details'] = e.details
return result