lingo/lingo/pricing/models.py

717 lines
24 KiB
Python

# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import dataclasses
import datetime
import decimal
from typing import List
from django.contrib.postgres.fields import JSONField
from django.db import models
from django.template import Context, RequestContext, Template, TemplateSyntaxError, VariableDoesNotExist
from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _
from lingo.agendas.models import Agenda, CheckType
from lingo.utils.misc import AgendaImportError, clean_import_data, generate_slug
class PricingError(Exception):
def __init__(self, details=None):
self.details = details or {}
super().__init__()
class AgendaPricingNotFound(PricingError):
pass
class CriteriaConditionNotFound(PricingError):
pass
class MultipleDefaultCriteriaCondition(PricingError):
pass
class PricingDataError(PricingError):
pass
class PricingDataFormatError(PricingError):
pass
class PricingUnknownCheckStatusError(PricingError):
pass
class PricingEventNotCheckedError(PricingError):
pass
class PricingBookingNotCheckedError(PricingError):
pass
class PricingMultipleBookingError(PricingError):
pass
class PricingBookingCheckTypeError(PricingError):
pass
class CriteriaCategory(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data, overwrite=False):
criterias = data.pop('criterias', [])
data = clean_import_data(cls, data)
category, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite:
Criteria.objects.filter(category=category).delete()
for criteria in criterias:
criteria['category'] = category
Criteria.import_json(criteria)
return created, category
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'criterias': [a.export_json() for a in self.criterias.all()],
}
class Criteria(models.Model):
category = models.ForeignKey(
CriteriaCategory, verbose_name=_('Category'), on_delete=models.CASCADE, related_name='criterias'
)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
condition = models.CharField(_('Condition'), max_length=1000, blank=True)
order = models.PositiveIntegerField()
default = models.BooleanField(
_('Default criteria'), default=False, help_text=_('Will be applied if no other criteria matches')
)
class Meta:
ordering = ['default', 'order']
unique_together = ['category', 'slug']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if self.default is True:
self.order = 0
elif self.order is None:
max_order = (
Criteria.objects.filter(category=self.category)
.aggregate(models.Max('order'))
.get('order__max')
or 0
)
self.order = max_order + 1
if not self.slug:
self.slug = generate_slug(self, category=self.category)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
cls.objects.update_or_create(slug=data['slug'], category=data['category'], defaults=data)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'condition': self.condition,
'order': self.order,
}
@property
def identifier(self):
return '%s:%s' % (self.category.slug, self.slug)
def compute_condition(self, context):
try:
template = Template('{%% if %s %%}OK{%% endif %%}' % self.condition)
except TemplateSyntaxError:
return False
return template.render(Context(context)) == 'OK'
class Pricing(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
categories = models.ManyToManyField(
CriteriaCategory,
related_name='pricings',
through='PricingCriteriaCategory',
)
criterias = models.ManyToManyField(Criteria)
extra_variables = JSONField(blank=True, default=dict)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
def get_extra_variables(self, request, original_context):
result = {}
context = RequestContext(request)
context.push(original_context)
for key, tplt in (self.extra_variables or {}).items():
try:
result[key] = Template(tplt).render(context)
except (TemplateSyntaxError, VariableDoesNotExist):
continue
return result
def get_extra_variables_keys(self):
return sorted((self.extra_variables or {}).keys())
@classmethod
def import_json(cls, data, overwrite=False):
data = data.copy()
categories = data.pop('categories', [])
categories_by_slug = {c.slug: c for c in CriteriaCategory.objects.all()}
criterias_by_categories_and_slug = {
(crit.category.slug, crit.slug): crit
for crit in Criteria.objects.select_related('category').all()
}
for category_data in categories:
category_slug = category_data['category']
if category_data['category'] not in categories_by_slug:
raise AgendaImportError(_('Missing "%s" pricing category') % category_data['category'])
for criteria_slug in category_data['criterias']:
if (category_slug, criteria_slug) not in criterias_by_categories_and_slug:
raise AgendaImportError(
_('Missing "%s" pricing criteria for "%s" category') % (criteria_slug, category_slug)
)
data = clean_import_data(cls, data)
pricing, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
PricingCriteriaCategory.objects.filter(pricing=pricing).delete()
criterias = []
for category_data in categories:
pricing.categories.add(
categories_by_slug[category_data['category']],
through_defaults={'order': category_data['order']},
)
for criteria_slug in category_data['criterias']:
criterias.append(criterias_by_categories_and_slug[(category_data['category'], criteria_slug)])
pricing.criterias.set(criterias)
return created, pricing
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'extra_variables': self.extra_variables,
'categories': [pcc.export_json() for pcc in PricingCriteriaCategory.objects.filter(pricing=self)],
}
def duplicate(self, label=None):
# clone current pricing
new_pricing = copy.deepcopy(self)
new_pricing.pk = None
new_pricing.label = label or _('Copy of %s') % self.label
# reset slug
new_pricing.slug = None
new_pricing.save()
# set criterias
new_pricing.criterias.set(self.criterias.all())
# set categories
for pcc in PricingCriteriaCategory.objects.filter(pricing=self):
pcc.duplicate(pricing_target=new_pricing)
return new_pricing
class PricingCriteriaCategory(models.Model):
pricing = models.ForeignKey(Pricing, on_delete=models.CASCADE)
category = models.ForeignKey(CriteriaCategory, on_delete=models.CASCADE)
order = models.PositiveIntegerField()
class Meta:
ordering = ['order']
unique_together = ['pricing', 'category']
def save(self, *args, **kwargs):
if self.order is None:
max_order = (
PricingCriteriaCategory.objects.filter(pricing=self.pricing)
.aggregate(models.Max('order'))
.get('order__max')
or 0
)
self.order = max_order + 1
super().save(*args, **kwargs)
def export_json(self):
return {
'category': self.category.slug,
'order': self.order,
'criterias': [c.slug for c in self.pricing.criterias.all() if c.category == self.category],
}
def duplicate(self, pricing_target):
new_pcc = copy.deepcopy(self)
new_pcc.pk = None
new_pcc.pricing = pricing_target
new_pcc.save()
return new_pcc
@dataclasses.dataclass
class PricingMatrixCell:
criteria: Criteria
value: decimal.Decimal
@dataclasses.dataclass
class PricingMatrixRow:
criteria: Criteria
cells: List[PricingMatrixCell]
@dataclasses.dataclass
class PricingMatrix:
criteria: Criteria
rows: List[PricingMatrixRow]
class AgendaPricing(models.Model):
label = models.CharField(_('Label'), max_length=150, null=True)
slug = models.SlugField(_('Identifier'), max_length=160, null=True)
agendas = models.ManyToManyField(Agenda, related_name='agendapricings')
pricing = models.ForeignKey(Pricing, on_delete=models.CASCADE, verbose_name=_('Pricing model'))
date_start = models.DateField(_('Start date'))
date_end = models.DateField(_('End date'))
flat_fee_schedule = models.BooleanField(_('Flat fee schedule'), default=False)
subscription_required = models.BooleanField(_('Subscription is required'), default=True)
pricing_data = JSONField(null=True)
def __str__(self):
return self.label or self.pricing.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label or self.pricing.label)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'pricing': self.pricing.slug,
'date_start': self.date_start.strftime('%Y-%m-%d'),
'date_end': self.date_end.strftime('%Y-%m-%d'),
'pricing_data': self.pricing_data,
'agendas': [a.slug for a in self.agendas.all()],
'billing_dates': [bd.export_json() for bd in self.billingdates.all()],
}
@classmethod
def import_json(cls, data, overwrite=False):
data = copy.deepcopy(data)
agenda_slugs = data.pop('agendas', None) or []
billing_dates = data.pop('billing_dates', None) or []
data = clean_import_data(cls, data)
agendas = []
for agenda_slug in agenda_slugs:
try:
agendas.append(Agenda.objects.get(slug=agenda_slug))
except Agenda.DoesNotExist:
raise AgendaImportError(_('Missing "%s" agenda') % agenda_slug)
try:
data['pricing'] = Pricing.objects.get(slug=data['pricing'])
except Pricing.DoesNotExist:
raise AgendaImportError(_('Missing "%s" pricing model') % data['pricing'])
agenda_pricing, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite and not created:
agenda_pricing.agendas.clear()
agenda_pricing.agendas.add(*agendas)
if overwrite and not created:
agenda_pricing.billingdates.all().delete()
for billing_date in billing_dates:
billing_date['agenda_pricing'] = agenda_pricing
BillingDate.import_json(billing_date)
return created, agenda_pricing
def get_pricing_data(self, request, pricing_date, user_external_id, adult_external_id, subscription=None):
# compute pricing for flat_fee_schedule mode
# subscription is None if subscription_required is False
data = {
'pricing_date': pricing_date, # date to use for QF
'subscription': subscription,
}
context = self.get_pricing_context(
request=request,
data=data,
user_external_id=user_external_id,
adult_external_id=adult_external_id,
)
pricing, criterias = self.compute_pricing(context=context)
return {
'pricing': pricing,
'calculation_details': {
'pricing': pricing,
'criterias': criterias,
'context': context,
},
}
def get_pricing_data_for_event(
self, request, agenda, event, subscription, check_status, booking, user_external_id, adult_external_id
):
# compute pricing for an event
event_date = datetime.datetime.fromisoformat(event['start_datetime']).date()
data = {
'pricing_date': event_date, # date to use for QF
'event': event,
'subscription': subscription,
'booking': booking,
}
context = self.get_pricing_context(
request=request,
data=data,
user_external_id=user_external_id,
adult_external_id=adult_external_id,
)
pricing, criterias = self.compute_pricing(context=context)
modifier = self.get_booking_modifier(agenda=agenda, check_status=check_status)
return self.aggregate_pricing_data(
pricing=pricing, criterias=criterias, context=context, modifier=modifier
)
def aggregate_pricing_data(self, pricing, criterias, context, modifier):
if modifier['modifier_type'] == 'fixed':
pricing_amount = modifier['modifier_fixed']
else:
pricing_amount = pricing * modifier['modifier_rate'] / 100
return {
'pricing': pricing_amount,
'calculation_details': {
'pricing': pricing,
'criterias': criterias,
'context': context,
},
'booking_details': modifier,
}
@staticmethod
def get_agenda_pricing(agenda, start_date, flat_fee_schedule):
try:
return agenda.agendapricings.get(
date_start__lte=start_date,
date_end__gt=start_date,
flat_fee_schedule=flat_fee_schedule,
)
except (AgendaPricing.DoesNotExist, AgendaPricing.MultipleObjectsReturned):
raise AgendaPricingNotFound
def get_pricing_context(self, request, data, user_external_id, adult_external_id):
context = {'data': data, 'user_external_id': user_external_id, 'adult_external_id': adult_external_id}
if ':' in user_external_id:
context['user_external_raw_id'] = user_external_id.split(':')[1]
if ':' in adult_external_id:
context['adult_external_raw_id'] = adult_external_id.split(':')[1]
return self.pricing.get_extra_variables(request, context)
def format_pricing_data(self):
# format data to ignore category ordering
def _format(data):
if not data:
return
if not isinstance(data, dict):
yield [], data
return
for criteria, val in data.items():
result = list(_format(val))
for criterias, pricing in result:
yield [criteria] + criterias, pricing
return {
self.format_pricing_data_key(criterias): pricing
for criterias, pricing in _format(self.pricing_data)
}
def format_pricing_data_key(self, values):
return '||'.join(sorted(values))
def compute_pricing(self, context):
criterias = {}
categories = []
# for each category (ordered)
for category in self.pricing.categories.all().order_by('pricingcriteriacategory__order'):
criterias[category.slug] = None
categories.append(category.slug)
# find the first matching criteria (criterias are ordered)
for criteria in self.pricing.criterias.filter(category=category, default=False):
condition = criteria.compute_condition(context)
if condition:
criterias[category.slug] = criteria.slug
break
if criterias[category.slug] is not None:
continue
# if no match, take default criteria if only once defined
default_criterias = self.pricing.criterias.filter(category=category, default=True)
if len(default_criterias) > 1:
raise MultipleDefaultCriteriaCondition(details={'category': category.slug})
if not default_criterias:
raise CriteriaConditionNotFound(details={'category': category.slug})
criterias[category.slug] = default_criterias[0].slug
# now search for pricing values matching found criterias
pricing_data = self.format_pricing_data()
pricing = pricing_data.get(
self.format_pricing_data_key(['%s:%s' % (k, v) for k, v in criterias.items()])
)
if pricing is None:
raise PricingDataError(details={'criterias': criterias})
try:
pricing = decimal.Decimal(pricing)
except (decimal.InvalidOperation, ValueError, TypeError):
raise PricingDataFormatError(details={'pricing': pricing, 'wanted': 'decimal'})
return pricing, criterias
def get_booking_modifier(self, agenda, check_status):
status = check_status['status']
if status not in ['error', 'not-booked', 'cancelled', 'presence', 'absence']:
raise PricingUnknownCheckStatusError
if status == 'error':
reason = check_status['error_reason']
# event must be checked
if reason == 'event-not-checked':
raise PricingEventNotCheckedError
# booking must be checked
if reason == 'booking-not-checked':
raise PricingBookingNotCheckedError
# too many bookings found
if reason == 'too-many-bookings-found':
raise PricingMultipleBookingError
# no booking found
if status == 'not-booked':
return {
'status': 'not-booked',
'modifier_type': 'rate',
'modifier_rate': 0,
}
# booking cancelled
if status == 'cancelled':
return {
'status': 'cancelled',
'modifier_type': 'rate',
'modifier_rate': 0,
}
# no check_type, default rates
if not check_status['check_type']:
return {
'status': status,
'check_type_group': None,
'check_type': None,
'modifier_type': 'rate',
'modifier_rate': 100 if status == 'presence' else 0,
}
try:
check_type = CheckType.objects.get(
group=agenda.check_type_group_id, slug=check_status['check_type']
)
except CheckType.DoesNotExist:
raise PricingBookingCheckTypeError(
details={
'reason': 'not-found',
}
)
# check_type kind and user_was_present mismatch
if check_type.kind != status:
raise PricingBookingCheckTypeError(
details={
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'reason': 'wrong-kind',
}
)
# get pricing modifier
if check_type.pricing is not None:
return {
'status': status,
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'modifier_type': 'fixed',
'modifier_fixed': check_type.pricing,
}
if check_type.pricing_rate is not None:
return {
'status': status,
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'modifier_type': 'rate',
'modifier_rate': check_type.pricing_rate,
}
# pricing not found
raise PricingBookingCheckTypeError(
details={
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'reason': 'not-configured',
}
)
def iter_pricing_matrix(self):
categories = self.pricing.categories.all().order_by('pricingcriteriacategory__order')[:3]
pricing_data = self.format_pricing_data()
if not categories:
return
if len(categories) < 3:
yield self.get_pricing_matrix(
main_criteria=None, categories=categories, pricing_data=pricing_data
)
return
# criterias are ordered
for criteria in self.pricing.criterias.all():
if criteria.category != categories[0]:
continue
yield self.get_pricing_matrix(
main_criteria=criteria,
categories=categories[1:],
pricing_data=pricing_data,
)
def get_pricing_matrix(self, main_criteria, categories, pricing_data):
matrix = PricingMatrix(
criteria=main_criteria,
rows=[],
)
def get_pricing_matrix_cell(criteria_2, criteria_3):
criterias = [main_criteria, criteria_2, criteria_3]
key = self.format_pricing_data_key([c.identifier for c in criterias if c])
try:
value = decimal.Decimal(str(pricing_data.get(key)))
except (decimal.InvalidOperation, ValueError, TypeError):
value = None
return PricingMatrixCell(criteria=criteria_2, value=value)
if len(categories) < 2:
criterias_2 = [None]
criterias_3 = [c for c in self.pricing.criterias.all() if c.category == categories[0]]
else:
criterias_2 = [c for c in self.pricing.criterias.all() if c.category == categories[0]]
criterias_3 = [c for c in self.pricing.criterias.all() if c.category == categories[1]]
rows = [
PricingMatrixRow(
criteria=criteria_3,
cells=[
get_pricing_matrix_cell(
criteria_2,
criteria_3,
)
for criteria_2 in criterias_2
],
)
for criteria_3 in criterias_3
]
matrix.rows = rows
return matrix
class BillingDate(models.Model):
agenda_pricing = models.ForeignKey(AgendaPricing, on_delete=models.CASCADE, related_name='billingdates')
date_start = models.DateField(_('Billing start date'))
label = models.CharField(_('Label'), max_length=150)
def __str__(self):
return '%s (%s)' % (self.date_start.strftime('%d/%m/%Y'), self.label)
def export_json(self):
return {
'date_start': self.date_start.strftime('%Y-%m-%d'),
'label': self.label,
}
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
cls.objects.update_or_create(
agenda_pricing=data['agenda_pricing'], date_start=data['date_start'], defaults=data
)