154 lines
6.5 KiB
Python
154 lines
6.5 KiB
Python
# lingo - payment and billing system
|
|
# Copyright (C) 2022 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from rest_framework import serializers
|
|
from rest_framework.exceptions import ValidationError
|
|
|
|
from lingo.agendas.chrono import ChronoError, get_events, get_subscriptions
|
|
from lingo.agendas.models import Agenda
|
|
from lingo.pricing.models import AgendaPricing, AgendaPricingNotFound, PricingError
|
|
|
|
|
|
class CommaSeparatedStringField(serializers.ListField):
|
|
def get_value(self, dictionary):
|
|
return super(serializers.ListField, self).get_value(dictionary)
|
|
|
|
def to_internal_value(self, data):
|
|
data = [s.strip() for s in data.split(',') if s.strip()]
|
|
return super().to_internal_value(data)
|
|
|
|
|
|
class PricingComputeSerializer(serializers.Serializer):
|
|
slots = CommaSeparatedStringField(
|
|
required=True, child=serializers.CharField(max_length=160, allow_blank=False)
|
|
)
|
|
user_external_id = serializers.CharField(required=True, max_length=250)
|
|
adult_external_id = serializers.CharField(required=True, max_length=250)
|
|
|
|
agenda_slugs = []
|
|
agendas = {}
|
|
serialized_events = {}
|
|
event_subscriptions = {}
|
|
|
|
def validate_slots(self, value):
|
|
self.agendas = {a.slug: a for a in Agenda.objects.all()}
|
|
allowed_agenda_slugs = self.agendas.keys()
|
|
agenda_slugs = set()
|
|
for slot in value:
|
|
try:
|
|
agenda_slug, event_slug = slot.split('@')
|
|
except ValueError:
|
|
raise ValidationError(_('Invalid format for slot %s') % slot)
|
|
if not agenda_slug:
|
|
raise ValidationError(_('Missing agenda slug in slot %s') % slot)
|
|
if not event_slug:
|
|
raise ValidationError(_('Missing event slug in slot %s') % slot)
|
|
agenda_slugs.add(agenda_slug)
|
|
extra_agendas = agenda_slugs - set(allowed_agenda_slugs)
|
|
if extra_agendas:
|
|
extra_agendas = ', '.join(sorted(extra_agendas))
|
|
raise ValidationError(_('Unknown agendas: %s') % extra_agendas)
|
|
self.agenda_slugs = sorted(agenda_slugs)
|
|
|
|
try:
|
|
serialized_events = get_events(value)
|
|
except ChronoError as e:
|
|
raise ValidationError(e)
|
|
else:
|
|
for serialized_event in serialized_events:
|
|
event_slug = '%s@%s' % (serialized_event['agenda'], serialized_event['slug'])
|
|
self.serialized_events[event_slug] = serialized_event
|
|
|
|
return value
|
|
|
|
def get_subscriptions(self, user_external_id):
|
|
agenda_subscriptions = {}
|
|
for agenda_slug in self.agenda_slugs:
|
|
try:
|
|
agenda_subscriptions[agenda_slug] = get_subscriptions(agenda_slug, user_external_id)
|
|
except ChronoError as e:
|
|
raise ValidationError({'user_external_id': e})
|
|
|
|
self.event_subscriptions = {}
|
|
for event_slug, serialized_event in self.serialized_events.items():
|
|
start_date = datetime.datetime.fromisoformat(serialized_event['start_datetime']).date()
|
|
end_date = start_date + datetime.timedelta(days=1)
|
|
agenda_slug = serialized_event['agenda']
|
|
event_subscription = None
|
|
for subscription in agenda_subscriptions[agenda_slug]:
|
|
sub_start_date = datetime.date.fromisoformat(subscription['date_start'])
|
|
sub_end_date = datetime.date.fromisoformat(subscription['date_end'])
|
|
if sub_start_date >= end_date:
|
|
continue
|
|
if sub_end_date <= start_date:
|
|
continue
|
|
event_subscription = subscription
|
|
break
|
|
if event_subscription is None:
|
|
raise ValidationError(
|
|
{'user_external_id': _('No subscription found for event %s') % event_slug}
|
|
)
|
|
self.event_subscriptions[event_slug] = event_subscription
|
|
|
|
def validate(self, attrs):
|
|
super().validate(attrs)
|
|
if attrs.get('user_external_id') and self.serialized_events:
|
|
self.get_subscriptions(attrs['user_external_id'])
|
|
return attrs
|
|
|
|
def compute(self, request):
|
|
if not self.serialized_events or not self.event_subscriptions:
|
|
return
|
|
result = []
|
|
event_slugs = sorted(self.serialized_events.keys())
|
|
for event_slug in event_slugs:
|
|
serialized_event = self.serialized_events[event_slug]
|
|
start_date = datetime.datetime.fromisoformat(serialized_event['start_datetime']).date()
|
|
agenda = self.agendas[serialized_event['agenda']]
|
|
try:
|
|
agenda_pricing = AgendaPricing.get_agenda_pricing(agenda=agenda, start_date=start_date)
|
|
pricing_data = agenda_pricing.get_pricing_data_for_event(
|
|
request=request,
|
|
agenda=agenda,
|
|
event=serialized_event,
|
|
subscription=self.event_subscriptions[event_slug],
|
|
check_status={
|
|
'status': 'presence',
|
|
'check_type': None,
|
|
},
|
|
booking={},
|
|
user_external_id=self.validated_data['user_external_id'],
|
|
adult_external_id=self.validated_data['adult_external_id'],
|
|
)
|
|
result.append(
|
|
{
|
|
'event': event_slug,
|
|
'pricing_data': pricing_data,
|
|
}
|
|
)
|
|
except AgendaPricingNotFound:
|
|
result.append(
|
|
{'event': event_slug, 'error': _('No agenda pricing found for event %s') % event_slug}
|
|
)
|
|
except PricingError as e:
|
|
result.append({'event': event_slug, 'error': e.details})
|
|
|
|
result = sorted(result, key=lambda d: d['event'])
|
|
return result
|