640 lines
21 KiB
Python
640 lines
21 KiB
Python
# lingo - payment and billing system
|
|
# Copyright (C) 2022 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import copy
|
|
import dataclasses
|
|
import datetime
|
|
import decimal
|
|
from typing import List
|
|
|
|
from django.contrib.postgres.fields import JSONField
|
|
from django.db import models
|
|
from django.template import Context, RequestContext, Template, TemplateSyntaxError, VariableDoesNotExist
|
|
from django.utils.text import slugify
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
from lingo.agendas.models import Agenda, CheckType
|
|
from lingo.utils.misc import AgendaImportError, clean_import_data, generate_slug
|
|
|
|
|
|
class PricingError(Exception):
|
|
def __init__(self, details=None):
|
|
self.details = details or {}
|
|
super().__init__()
|
|
|
|
|
|
class AgendaPricingNotFound(PricingError):
|
|
pass
|
|
|
|
|
|
class CriteriaConditionNotFound(PricingError):
|
|
pass
|
|
|
|
|
|
class MultipleDefaultCriteriaCondition(PricingError):
|
|
pass
|
|
|
|
|
|
class PricingDataError(PricingError):
|
|
pass
|
|
|
|
|
|
class PricingDataFormatError(PricingError):
|
|
pass
|
|
|
|
|
|
class PricingUnknownCheckStatusError(PricingError):
|
|
pass
|
|
|
|
|
|
class PricingEventNotCheckedError(PricingError):
|
|
pass
|
|
|
|
|
|
class PricingBookingNotCheckedError(PricingError):
|
|
pass
|
|
|
|
|
|
class PricingMultipleBookingError(PricingError):
|
|
pass
|
|
|
|
|
|
class PricingBookingCheckTypeError(PricingError):
|
|
pass
|
|
|
|
|
|
class CriteriaCategory(models.Model):
|
|
label = models.CharField(_('Label'), max_length=150)
|
|
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
|
|
|
|
class Meta:
|
|
ordering = ['label']
|
|
|
|
def __str__(self):
|
|
return self.label
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self.slug:
|
|
self.slug = generate_slug(self)
|
|
super().save(*args, **kwargs)
|
|
|
|
@property
|
|
def base_slug(self):
|
|
return slugify(self.label)
|
|
|
|
@classmethod
|
|
def import_json(cls, data, overwrite=False):
|
|
criterias = data.pop('criterias', [])
|
|
data = clean_import_data(cls, data)
|
|
category, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
|
|
|
|
if overwrite:
|
|
Criteria.objects.filter(category=category).delete()
|
|
|
|
for criteria in criterias:
|
|
criteria['category'] = category
|
|
Criteria.import_json(criteria)
|
|
|
|
return created, category
|
|
|
|
def export_json(self):
|
|
return {
|
|
'label': self.label,
|
|
'slug': self.slug,
|
|
'criterias': [a.export_json() for a in self.criterias.all()],
|
|
}
|
|
|
|
|
|
class Criteria(models.Model):
|
|
category = models.ForeignKey(
|
|
CriteriaCategory, verbose_name=_('Category'), on_delete=models.CASCADE, related_name='criterias'
|
|
)
|
|
label = models.CharField(_('Label'), max_length=150)
|
|
slug = models.SlugField(_('Identifier'), max_length=160)
|
|
condition = models.CharField(_('Condition'), max_length=1000, blank=True)
|
|
order = models.PositiveIntegerField()
|
|
default = models.BooleanField(
|
|
_('Default criteria'), default=False, help_text=_('Will be applied if no other criteria matches')
|
|
)
|
|
|
|
class Meta:
|
|
ordering = ['default', 'order']
|
|
unique_together = ['category', 'slug']
|
|
|
|
def __str__(self):
|
|
return self.label
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.default is True:
|
|
self.order = 0
|
|
elif self.order is None:
|
|
max_order = (
|
|
Criteria.objects.filter(category=self.category)
|
|
.aggregate(models.Max('order'))
|
|
.get('order__max')
|
|
or 0
|
|
)
|
|
self.order = max_order + 1
|
|
if not self.slug:
|
|
self.slug = generate_slug(self, category=self.category)
|
|
super().save(*args, **kwargs)
|
|
|
|
@property
|
|
def base_slug(self):
|
|
return slugify(self.label)
|
|
|
|
@classmethod
|
|
def import_json(cls, data):
|
|
data = clean_import_data(cls, data)
|
|
cls.objects.update_or_create(slug=data['slug'], category=data['category'], defaults=data)
|
|
|
|
def export_json(self):
|
|
return {
|
|
'label': self.label,
|
|
'slug': self.slug,
|
|
'condition': self.condition,
|
|
'order': self.order,
|
|
}
|
|
|
|
@property
|
|
def identifier(self):
|
|
return '%s:%s' % (self.category.slug, self.slug)
|
|
|
|
def compute_condition(self, context):
|
|
try:
|
|
template = Template('{%% if %s %%}OK{%% endif %%}' % self.condition)
|
|
except TemplateSyntaxError:
|
|
return False
|
|
return template.render(Context(context)) == 'OK'
|
|
|
|
|
|
class Pricing(models.Model):
|
|
label = models.CharField(_('Label'), max_length=150)
|
|
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
|
|
categories = models.ManyToManyField(
|
|
CriteriaCategory,
|
|
related_name='pricings',
|
|
through='PricingCriteriaCategory',
|
|
)
|
|
criterias = models.ManyToManyField(Criteria)
|
|
extra_variables = JSONField(blank=True, default=dict)
|
|
|
|
class Meta:
|
|
ordering = ['label']
|
|
|
|
def __str__(self):
|
|
return self.label
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self.slug:
|
|
self.slug = generate_slug(self)
|
|
super().save(*args, **kwargs)
|
|
|
|
@property
|
|
def base_slug(self):
|
|
return slugify(self.label)
|
|
|
|
def get_extra_variables(self, request, original_context):
|
|
result = {}
|
|
context = RequestContext(request)
|
|
context.push(original_context)
|
|
for key, tplt in (self.extra_variables or {}).items():
|
|
try:
|
|
result[key] = Template(tplt).render(context)
|
|
except (TemplateSyntaxError, VariableDoesNotExist):
|
|
continue
|
|
return result
|
|
|
|
def get_extra_variables_keys(self):
|
|
return sorted((self.extra_variables or {}).keys())
|
|
|
|
@classmethod
|
|
def import_json(cls, data, overwrite=False):
|
|
data = data.copy()
|
|
categories = data.pop('categories', [])
|
|
categories_by_slug = {c.slug: c for c in CriteriaCategory.objects.all()}
|
|
criterias_by_categories_and_slug = {
|
|
(crit.category.slug, crit.slug): crit
|
|
for crit in Criteria.objects.select_related('category').all()
|
|
}
|
|
for category_data in categories:
|
|
category_slug = category_data['category']
|
|
if category_data['category'] not in categories_by_slug:
|
|
raise AgendaImportError(_('Missing "%s" pricing category') % category_data['category'])
|
|
for criteria_slug in category_data['criterias']:
|
|
if (category_slug, criteria_slug) not in criterias_by_categories_and_slug:
|
|
raise AgendaImportError(
|
|
_('Missing "%s" pricing criteria for "%s" category') % (criteria_slug, category_slug)
|
|
)
|
|
data = clean_import_data(cls, data)
|
|
pricing, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
|
|
|
|
PricingCriteriaCategory.objects.filter(pricing=pricing).delete()
|
|
criterias = []
|
|
for category_data in categories:
|
|
pricing.categories.add(
|
|
categories_by_slug[category_data['category']],
|
|
through_defaults={'order': category_data['order']},
|
|
)
|
|
for criteria_slug in category_data['criterias']:
|
|
criterias.append(criterias_by_categories_and_slug[(category_data['category'], criteria_slug)])
|
|
pricing.criterias.set(criterias)
|
|
|
|
return created, pricing
|
|
|
|
def export_json(self):
|
|
return {
|
|
'label': self.label,
|
|
'slug': self.slug,
|
|
'extra_variables': self.extra_variables,
|
|
'categories': [pcc.export_json() for pcc in PricingCriteriaCategory.objects.filter(pricing=self)],
|
|
}
|
|
|
|
def duplicate(self, label=None):
|
|
# clone current pricing
|
|
new_pricing = copy.deepcopy(self)
|
|
new_pricing.pk = None
|
|
new_pricing.label = label or _('Copy of %s') % self.label
|
|
# reset slug
|
|
new_pricing.slug = None
|
|
new_pricing.save()
|
|
|
|
# set criterias
|
|
new_pricing.criterias.set(self.criterias.all())
|
|
|
|
# set categories
|
|
for pcc in PricingCriteriaCategory.objects.filter(pricing=self):
|
|
pcc.duplicate(pricing_target=new_pricing)
|
|
|
|
return new_pricing
|
|
|
|
|
|
class PricingCriteriaCategory(models.Model):
|
|
pricing = models.ForeignKey(Pricing, on_delete=models.CASCADE)
|
|
category = models.ForeignKey(CriteriaCategory, on_delete=models.CASCADE)
|
|
order = models.PositiveIntegerField()
|
|
|
|
class Meta:
|
|
ordering = ['order']
|
|
unique_together = ['pricing', 'category']
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.order is None:
|
|
max_order = (
|
|
PricingCriteriaCategory.objects.filter(pricing=self.pricing)
|
|
.aggregate(models.Max('order'))
|
|
.get('order__max')
|
|
or 0
|
|
)
|
|
self.order = max_order + 1
|
|
super().save(*args, **kwargs)
|
|
|
|
def export_json(self):
|
|
return {
|
|
'category': self.category.slug,
|
|
'order': self.order,
|
|
'criterias': [c.slug for c in self.pricing.criterias.all() if c.category == self.category],
|
|
}
|
|
|
|
def duplicate(self, pricing_target):
|
|
new_pcc = copy.deepcopy(self)
|
|
new_pcc.pk = None
|
|
new_pcc.pricing = pricing_target
|
|
new_pcc.save()
|
|
return new_pcc
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PricingMatrixCell:
|
|
criteria: Criteria
|
|
value: decimal.Decimal
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PricingMatrixRow:
|
|
criteria: Criteria
|
|
cells: List[PricingMatrixCell]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PricingMatrix:
|
|
criteria: Criteria
|
|
rows: List[PricingMatrixRow]
|
|
|
|
|
|
class AgendaPricing(models.Model):
|
|
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
|
|
pricing = models.ForeignKey(Pricing, on_delete=models.CASCADE)
|
|
date_start = models.DateField()
|
|
date_end = models.DateField()
|
|
pricing_data = JSONField(null=True)
|
|
|
|
@classmethod
|
|
def import_json(cls, data):
|
|
data = clean_import_data(cls, data)
|
|
cls.objects.update_or_create(
|
|
agenda=data['agenda'],
|
|
pricing=data['pricing'],
|
|
date_start=data['date_start'],
|
|
date_end=data['date_end'],
|
|
defaults=data,
|
|
)
|
|
|
|
def export_json(self):
|
|
return {
|
|
'pricing': self.pricing.slug,
|
|
'date_start': self.date_start.strftime('%Y-%m-%d'),
|
|
'date_end': self.date_end.strftime('%Y-%m-%d'),
|
|
'pricing_data': self.pricing_data,
|
|
}
|
|
|
|
@staticmethod
|
|
def get_pricing_data(
|
|
request,
|
|
agenda,
|
|
event,
|
|
subscription,
|
|
check_status,
|
|
booking,
|
|
user_external_id,
|
|
adult_external_id,
|
|
agenda_pricing=None,
|
|
):
|
|
agenda_pricing = agenda_pricing or AgendaPricing.get_agenda_pricing(agenda=agenda, event=event)
|
|
data = {
|
|
'event': event,
|
|
'subscription': subscription,
|
|
'booking': booking,
|
|
}
|
|
context = agenda_pricing.get_pricing_context(
|
|
request=request,
|
|
data=data,
|
|
user_external_id=user_external_id,
|
|
adult_external_id=adult_external_id,
|
|
)
|
|
pricing, criterias = agenda_pricing.compute_pricing(context=context)
|
|
modifier = agenda_pricing.get_booking_modifier(check_status=check_status)
|
|
return agenda_pricing.aggregate_pricing_data(
|
|
pricing=pricing, criterias=criterias, context=context, modifier=modifier
|
|
)
|
|
|
|
def aggregate_pricing_data(self, pricing, criterias, context, modifier):
|
|
if modifier['modifier_type'] == 'fixed':
|
|
pricing_amount = modifier['modifier_fixed']
|
|
else:
|
|
pricing_amount = pricing * modifier['modifier_rate'] / 100
|
|
return {
|
|
'pricing': pricing_amount,
|
|
'calculation_details': {
|
|
'pricing': pricing,
|
|
'criterias': criterias,
|
|
'context': context,
|
|
},
|
|
'booking_details': modifier,
|
|
}
|
|
|
|
@staticmethod
|
|
def get_agenda_pricing(agenda, event):
|
|
start_datetime = datetime.datetime.fromisoformat(event['start_datetime'])
|
|
try:
|
|
return agenda.agendapricing_set.get(
|
|
date_start__lte=start_datetime,
|
|
date_end__gt=start_datetime,
|
|
)
|
|
except (AgendaPricing.DoesNotExist, AgendaPricing.MultipleObjectsReturned):
|
|
raise AgendaPricingNotFound
|
|
|
|
def get_pricing_context(self, request, data, user_external_id, adult_external_id):
|
|
context = {'data': data, 'user_external_id': user_external_id, 'adult_external_id': adult_external_id}
|
|
if ':' in user_external_id:
|
|
context['user_external_raw_id'] = user_external_id.split(':')[1]
|
|
if ':' in adult_external_id:
|
|
context['adult_external_raw_id'] = adult_external_id.split(':')[1]
|
|
return self.pricing.get_extra_variables(request, context)
|
|
|
|
def format_pricing_data(self):
|
|
# format data to ignore category ordering
|
|
|
|
def _format(data):
|
|
if not data:
|
|
return
|
|
|
|
if not isinstance(data, dict):
|
|
yield [], data
|
|
return
|
|
|
|
for criteria, val in data.items():
|
|
result = list(_format(val))
|
|
for criterias, pricing in result:
|
|
yield [criteria] + criterias, pricing
|
|
|
|
return {
|
|
self.format_pricing_data_key(criterias): pricing
|
|
for criterias, pricing in _format(self.pricing_data)
|
|
}
|
|
|
|
def format_pricing_data_key(self, values):
|
|
return '||'.join(sorted(values))
|
|
|
|
def compute_pricing(self, context):
|
|
criterias = {}
|
|
categories = []
|
|
# for each category (ordered)
|
|
for category in self.pricing.categories.all().order_by('pricingcriteriacategory__order'):
|
|
criterias[category.slug] = None
|
|
categories.append(category.slug)
|
|
# find the first matching criteria (criterias are ordered)
|
|
for criteria in self.pricing.criterias.filter(category=category, default=False):
|
|
condition = criteria.compute_condition(context)
|
|
if condition:
|
|
criterias[category.slug] = criteria.slug
|
|
break
|
|
if criterias[category.slug] is not None:
|
|
continue
|
|
# if no match, take default criteria if only once defined
|
|
default_criterias = self.pricing.criterias.filter(category=category, default=True)
|
|
if len(default_criterias) > 1:
|
|
raise MultipleDefaultCriteriaCondition(details={'category': category.slug})
|
|
if not default_criterias:
|
|
raise CriteriaConditionNotFound(details={'category': category.slug})
|
|
criterias[category.slug] = default_criterias[0].slug
|
|
|
|
# now search for pricing values matching found criterias
|
|
pricing_data = self.format_pricing_data()
|
|
pricing = pricing_data.get(
|
|
self.format_pricing_data_key(['%s:%s' % (k, v) for k, v in criterias.items()])
|
|
)
|
|
if pricing is None:
|
|
raise PricingDataError(details={'criterias': criterias})
|
|
|
|
try:
|
|
pricing = decimal.Decimal(pricing)
|
|
except (decimal.InvalidOperation, ValueError, TypeError):
|
|
raise PricingDataFormatError(details={'pricing': pricing, 'wanted': 'decimal'})
|
|
|
|
return pricing, criterias
|
|
|
|
def get_booking_modifier(self, check_status):
|
|
status = check_status['status']
|
|
if status not in ['error', 'not-booked', 'cancelled', 'presence', 'absence']:
|
|
raise PricingUnknownCheckStatusError
|
|
|
|
if status == 'error':
|
|
reason = check_status['error_reason']
|
|
# event must be checked
|
|
if reason == 'event-not-checked':
|
|
raise PricingEventNotCheckedError
|
|
# booking must be checked
|
|
if reason == 'booking-not-checked':
|
|
raise PricingBookingNotCheckedError
|
|
# too many bookings found
|
|
if reason == 'too-many-bookings-found':
|
|
raise PricingMultipleBookingError
|
|
|
|
# no booking found
|
|
if status == 'not-booked':
|
|
return {
|
|
'status': 'not-booked',
|
|
'modifier_type': 'rate',
|
|
'modifier_rate': 0,
|
|
}
|
|
|
|
# booking cancelled
|
|
if status == 'cancelled':
|
|
return {
|
|
'status': 'cancelled',
|
|
'modifier_type': 'rate',
|
|
'modifier_rate': 0,
|
|
}
|
|
|
|
# no check_type, default rates
|
|
if not check_status['check_type']:
|
|
return {
|
|
'status': status,
|
|
'check_type_group': None,
|
|
'check_type': None,
|
|
'modifier_type': 'rate',
|
|
'modifier_rate': 100 if status == 'presence' else 0,
|
|
}
|
|
|
|
try:
|
|
check_type = CheckType.objects.get(
|
|
group=self.agenda.check_type_group_id, slug=check_status['check_type']
|
|
)
|
|
except CheckType.DoesNotExist:
|
|
raise PricingBookingCheckTypeError(
|
|
details={
|
|
'reason': 'not-found',
|
|
}
|
|
)
|
|
# check_type kind and user_was_present mismatch
|
|
if check_type.kind != status:
|
|
raise PricingBookingCheckTypeError(
|
|
details={
|
|
'check_type_group': check_type.group.slug,
|
|
'check_type': check_type.slug,
|
|
'reason': 'wrong-kind',
|
|
}
|
|
)
|
|
|
|
# get pricing modifier
|
|
if check_type.pricing is not None:
|
|
return {
|
|
'status': status,
|
|
'check_type_group': check_type.group.slug,
|
|
'check_type': check_type.slug,
|
|
'modifier_type': 'fixed',
|
|
'modifier_fixed': check_type.pricing,
|
|
}
|
|
if check_type.pricing_rate is not None:
|
|
return {
|
|
'status': status,
|
|
'check_type_group': check_type.group.slug,
|
|
'check_type': check_type.slug,
|
|
'modifier_type': 'rate',
|
|
'modifier_rate': check_type.pricing_rate,
|
|
}
|
|
# pricing not found
|
|
raise PricingBookingCheckTypeError(
|
|
details={
|
|
'check_type_group': check_type.group.slug,
|
|
'check_type': check_type.slug,
|
|
'reason': 'not-configured',
|
|
}
|
|
)
|
|
|
|
def iter_pricing_matrix(self):
|
|
categories = self.pricing.categories.all().order_by('pricingcriteriacategory__order')[:3]
|
|
pricing_data = self.format_pricing_data()
|
|
|
|
if not categories:
|
|
return
|
|
|
|
if len(categories) < 3:
|
|
yield self.get_pricing_matrix(
|
|
main_criteria=None, categories=categories, pricing_data=pricing_data
|
|
)
|
|
return
|
|
|
|
# criterias are ordered
|
|
for criteria in self.pricing.criterias.all():
|
|
if criteria.category != categories[0]:
|
|
continue
|
|
yield self.get_pricing_matrix(
|
|
main_criteria=criteria,
|
|
categories=categories[1:],
|
|
pricing_data=pricing_data,
|
|
)
|
|
|
|
def get_pricing_matrix(self, main_criteria, categories, pricing_data):
|
|
matrix = PricingMatrix(
|
|
criteria=main_criteria,
|
|
rows=[],
|
|
)
|
|
|
|
def get_pricing_matrix_cell(criteria_2, criteria_3):
|
|
criterias = [main_criteria, criteria_2, criteria_3]
|
|
key = self.format_pricing_data_key([c.identifier for c in criterias if c])
|
|
try:
|
|
value = decimal.Decimal(str(pricing_data.get(key)))
|
|
except (decimal.InvalidOperation, ValueError, TypeError):
|
|
value = None
|
|
return PricingMatrixCell(criteria=criteria_2, value=value)
|
|
|
|
if len(categories) < 2:
|
|
criterias_2 = [None]
|
|
criterias_3 = [c for c in self.pricing.criterias.all() if c.category == categories[0]]
|
|
else:
|
|
criterias_2 = [c for c in self.pricing.criterias.all() if c.category == categories[0]]
|
|
criterias_3 = [c for c in self.pricing.criterias.all() if c.category == categories[1]]
|
|
|
|
rows = [
|
|
PricingMatrixRow(
|
|
criteria=criteria_3,
|
|
cells=[
|
|
get_pricing_matrix_cell(
|
|
criteria_2,
|
|
criteria_3,
|
|
)
|
|
for criteria_2 in criterias_2
|
|
],
|
|
)
|
|
for criteria_3 in criterias_3
|
|
]
|
|
matrix.rows = rows
|
|
|
|
return matrix
|