lingo/lingo/pricing/models.py

803 lines
28 KiB
Python

# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import dataclasses
import datetime
import decimal
from django.db import models
from django.template import Context, RequestContext, Template, TemplateSyntaxError, VariableDoesNotExist
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
from lingo.agendas.models import Agenda, CheckType
from lingo.utils.misc import LingoImportError, clean_import_data, generate_slug
class PricingError(Exception):
def __init__(self, details=None):
self.details = details or {}
super().__init__()
class AgendaPricingNotFound(PricingError):
pass
class PayerError(PricingError):
pass
class PayerDataError(PricingError):
pass
class CriteriaConditionNotFound(PricingError):
pass
class MultipleDefaultCriteriaCondition(PricingError):
pass
class PricingDataError(PricingError):
pass
class PricingDataFormatError(PricingError):
pass
class PricingUnknownCheckStatusError(PricingError):
pass
class PricingEventNotCheckedError(PricingError):
pass
class PricingBookingNotCheckedError(PricingError):
pass
class PricingMultipleBookingError(PricingError):
pass
class PricingBookingCheckTypeError(PricingError):
pass
class CriteriaCategory(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data):
criterias = data.pop('criterias', [])
data = clean_import_data(cls, data)
category, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
for criteria in criterias:
criteria['category'] = category
Criteria.import_json(criteria)
return created, category
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'criterias': [a.export_json() for a in self.criterias.all()],
}
class Criteria(models.Model):
category = models.ForeignKey(
CriteriaCategory, verbose_name=_('Category'), on_delete=models.CASCADE, related_name='criterias'
)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
condition = models.CharField(_('Condition'), max_length=1000, blank=True)
order = models.PositiveIntegerField()
default = models.BooleanField(
_('Default criteria'), default=False, help_text=_('Will be applied if no other criteria matches')
)
class Meta:
ordering = ['default', 'order']
unique_together = ['category', 'slug']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if self.default is True:
self.order = 0
elif self.order is None:
max_order = (
Criteria.objects.filter(category=self.category)
.aggregate(models.Max('order'))
.get('order__max')
or 0
)
self.order = max_order + 1
if not self.slug:
self.slug = generate_slug(self, category=self.category)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
cls.objects.update_or_create(slug=data['slug'], category=data['category'], defaults=data)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'condition': self.condition,
'order': self.order,
}
@property
def identifier(self):
return '%s:%s' % (self.category.slug, self.slug)
def compute_condition(self, context):
try:
template = Template('{%% if %s %%}OK{%% endif %%}' % self.condition)
except TemplateSyntaxError:
return False
return template.render(Context(context)) == 'OK'
class Pricing(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
categories = models.ManyToManyField(
CriteriaCategory,
related_name='pricings',
through='PricingCriteriaCategory',
)
criterias = models.ManyToManyField(Criteria)
extra_variables = models.JSONField(blank=True, default=dict)
payer_variables = models.JSONField(blank=True, default=dict)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
def get_extra_variables(self, request, original_context):
result = {}
context = RequestContext(request)
context.push(original_context)
for key, tplt in (self.extra_variables or {}).items():
try:
result[key] = Template(tplt).render(context)
except (TemplateSyntaxError, VariableDoesNotExist):
continue
return result
def get_extra_variables_keys(self):
return sorted((self.extra_variables or {}).keys())
def get_payer_variables_keys(self):
return [
'payer_external_id',
'payer_external_id_from_nameid',
'payer_first_name',
'payer_last_name',
'payer_demat',
'payer_direct_debit',
]
def get_payer_external_id(self, request, original_context, key='payer_external_id'):
context = RequestContext(request)
context.push(original_context)
tplt = self.payer_variables.get(key) or ''
if not tplt:
raise PayerError(details={'reason': 'empty-template'})
try:
value = Template(tplt).render(context)
if not value:
raise PayerError(details={'reason': 'empty-result'})
return value
except TemplateSyntaxError:
raise PayerError(details={'reason': 'syntax-error'})
except VariableDoesNotExist:
raise PayerError(details={'reason': 'variable-error'})
def get_payer_external_id_from_nameid(self, request, original_context):
return self.get_payer_external_id(request, original_context, key='payer_external_id_from_nameid')
def get_payer_data(self, request, original_context):
result = {}
context = RequestContext(request)
context.push(original_context)
bool_keys = ['payer_demat', 'payer_direct_debit']
for key in self.get_payer_variables_keys():
if key in ['payer_external_id', 'payer_external_id_from_nameid']:
continue
tplt = self.payer_variables.get(key) or ''
if not tplt:
if key not in bool_keys:
raise PayerDataError(
details={'key': key.removeprefix('payer_'), 'reason': 'empty-template'}
)
tplt = 'False'
try:
value = Template(tplt).render(context)
if not value:
if key not in bool_keys:
raise PayerDataError(
details={'key': key.removeprefix('payer_'), 'reason': 'empty-result'}
)
value = False
if key in bool_keys:
if value in ('True', 'true', '1'):
value = True
elif value in ('False', 'false', '0'):
value = False
else:
raise PayerDataError(
details={'key': key.removeprefix('payer_'), 'reason': 'not-a-boolean'}
)
result[key] = value
except TemplateSyntaxError:
raise PayerDataError(details={'key': key.removeprefix('payer_'), 'reason': 'syntax-error'})
except VariableDoesNotExist:
raise PayerDataError(details={'key': key.removeprefix('payer_'), 'reason': 'variable-error'})
return result
@classmethod
def import_json(cls, data):
data = data.copy()
categories = data.pop('categories', [])
categories_by_slug = {c.slug: c for c in CriteriaCategory.objects.all()}
criterias_by_categories_and_slug = {
(crit.category.slug, crit.slug): crit
for crit in Criteria.objects.select_related('category').all()
}
for category_data in categories:
category_slug = category_data['category']
if category_data['category'] not in categories_by_slug:
raise LingoImportError(_('Missing "%s" pricing category') % category_data['category'])
for criteria_slug in category_data['criterias']:
if (category_slug, criteria_slug) not in criterias_by_categories_and_slug:
raise LingoImportError(
_('Missing "%s" pricing criteria for "%s" category') % (criteria_slug, category_slug)
)
data = clean_import_data(cls, data)
pricing, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
PricingCriteriaCategory.objects.filter(pricing=pricing).delete()
criterias = []
for category_data in categories:
pricing.categories.add(
categories_by_slug[category_data['category']],
through_defaults={'order': category_data['order']},
)
for criteria_slug in category_data['criterias']:
criterias.append(criterias_by_categories_and_slug[(category_data['category'], criteria_slug)])
pricing.criterias.set(criterias)
return created, pricing
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'extra_variables': self.extra_variables,
'categories': [pcc.export_json() for pcc in PricingCriteriaCategory.objects.filter(pricing=self)],
}
def duplicate(self, label=None):
# clone current pricing
new_pricing = copy.deepcopy(self)
new_pricing.pk = None
new_pricing.label = label or _('Copy of %s') % self.label
# reset slug
new_pricing.slug = None
new_pricing.save()
# set criterias
new_pricing.criterias.set(self.criterias.all())
# set categories
for pcc in PricingCriteriaCategory.objects.filter(pricing=self):
pcc.duplicate(pricing_target=new_pricing)
return new_pricing
class PricingCriteriaCategory(models.Model):
pricing = models.ForeignKey(Pricing, on_delete=models.CASCADE)
category = models.ForeignKey(CriteriaCategory, on_delete=models.CASCADE)
order = models.PositiveIntegerField()
class Meta:
ordering = ['order']
unique_together = ['pricing', 'category']
def save(self, *args, **kwargs):
if self.order is None:
max_order = (
PricingCriteriaCategory.objects.filter(pricing=self.pricing)
.aggregate(models.Max('order'))
.get('order__max')
or 0
)
self.order = max_order + 1
super().save(*args, **kwargs)
def export_json(self):
return {
'category': self.category.slug,
'order': self.order,
'criterias': [c.slug for c in self.pricing.criterias.all() if c.category == self.category],
}
def duplicate(self, pricing_target):
new_pcc = copy.deepcopy(self)
new_pcc.pk = None
new_pcc.pricing = pricing_target
new_pcc.save()
return new_pcc
@dataclasses.dataclass
class PricingMatrixCell:
criteria: Criteria
value: decimal.Decimal
@dataclasses.dataclass
class PricingMatrixRow:
criteria: Criteria
cells: list[PricingMatrixCell]
@dataclasses.dataclass
class PricingMatrix:
criteria: Criteria
rows: list[PricingMatrixRow]
class AgendaPricing(models.Model):
label = models.CharField(_('Label'), max_length=150, null=True)
slug = models.SlugField(_('Identifier'), max_length=160, null=True)
agendas = models.ManyToManyField(Agenda, related_name='agendapricings')
pricing = models.ForeignKey(Pricing, on_delete=models.CASCADE, verbose_name=_('Pricing model'))
date_start = models.DateField(_('Start date'))
date_end = models.DateField(_('End date'))
flat_fee_schedule = models.BooleanField(_('Flat fee schedule'), default=False)
subscription_required = models.BooleanField(_('Subscription is required'), default=True)
pricing_data = models.JSONField(null=True)
def __str__(self):
return self.label or self.pricing.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label or self.pricing.label)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'pricing': self.pricing.slug,
'date_start': self.date_start.strftime('%Y-%m-%d'),
'date_end': self.date_end.strftime('%Y-%m-%d'),
'pricing_data': self.pricing_data,
'agendas': [a.slug for a in self.agendas.all()],
'billing_dates': [bd.export_json() for bd in self.billingdates.all()],
}
@classmethod
def import_json(cls, data):
data = copy.deepcopy(data)
agenda_slugs = data.pop('agendas', None) or []
billing_dates = data.pop('billing_dates', None) or []
data = clean_import_data(cls, data)
agendas = []
for agenda_slug in agenda_slugs:
try:
agendas.append(Agenda.objects.get(slug=agenda_slug))
except Agenda.DoesNotExist:
raise LingoImportError(_('Missing "%s" agenda') % agenda_slug)
try:
data['pricing'] = Pricing.objects.get(slug=data['pricing'])
except Pricing.DoesNotExist:
raise LingoImportError(_('Missing "%s" pricing model') % data['pricing'])
agenda_pricing, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
agenda_pricing.agendas.add(*agendas)
for billing_date in billing_dates:
billing_date['agenda_pricing'] = agenda_pricing
BillingDate.import_json(billing_date)
return created, agenda_pricing
def get_payer_external_id(self, request, user_external_id):
context = {'user_external_id': user_external_id}
if ':' in user_external_id:
context['user_external_raw_id'] = user_external_id.split(':')[1]
return self.pricing.get_payer_external_id(request, context)
def get_payer_external_id_from_nameid(self, request, nameid):
context = {'nameid': nameid}
return self.pricing.get_payer_external_id_from_nameid(request, context)
def get_payer_data(self, request, payer_external_id):
context = {'payer_external_id': payer_external_id}
if ':' in payer_external_id:
context['payer_external_raw_id'] = payer_external_id.split(':')[1]
return self.pricing.get_payer_data(request, context)
def get_pricing_data(self, request, pricing_date, user_external_id, payer_external_id):
# compute pricing for flat_fee_schedule mode
data = {
'pricing_date': pricing_date, # date to use for QF
}
context = self.get_pricing_context(
request=request,
data=data,
user_external_id=user_external_id,
payer_external_id=payer_external_id,
)
pricing, criterias = self.compute_pricing(context=context)
return {
'pricing': pricing,
'calculation_details': {
'pricing': pricing,
'criterias': criterias,
'context': context,
},
}
def get_pricing_data_for_event(
self, request, agenda, event, check_status, user_external_id, payer_external_id
):
# compute pricing for an event
event_date = datetime.datetime.fromisoformat(event['start_datetime']).date()
data = {
'pricing_date': event_date, # date to use for QF
'event': event,
}
context = self.get_pricing_context(
request=request,
data=data,
user_external_id=user_external_id,
payer_external_id=payer_external_id,
)
pricing, criterias = self.compute_pricing(context=context)
modifier = self.get_booking_modifier(agenda=agenda, check_status=check_status)
return self.aggregate_pricing_data(
pricing=pricing, criterias=criterias, context=context, modifier=modifier
)
def aggregate_pricing_data(self, pricing, criterias, context, modifier):
if modifier['modifier_type'] == 'fixed':
pricing_amount = modifier['modifier_fixed']
else:
pricing_amount = pricing * modifier['modifier_rate'] / 100
return {
'pricing': pricing_amount,
'calculation_details': {
'pricing': pricing,
'criterias': criterias,
'context': context,
},
'booking_details': modifier,
}
@staticmethod
def get_agenda_pricing(agenda, start_date, flat_fee_schedule):
try:
return agenda.agendapricings.get(
date_start__lte=start_date,
date_end__gt=start_date,
flat_fee_schedule=flat_fee_schedule,
)
except (AgendaPricing.DoesNotExist, AgendaPricing.MultipleObjectsReturned):
raise AgendaPricingNotFound
def get_pricing_context(self, request, data, user_external_id, payer_external_id):
context = {'data': data, 'user_external_id': user_external_id, 'payer_external_id': payer_external_id}
if ':' in user_external_id:
context['user_external_raw_id'] = user_external_id.split(':')[1]
if ':' in payer_external_id:
context['payer_external_raw_id'] = payer_external_id.split(':')[1]
return self.pricing.get_extra_variables(request, context)
def format_pricing_data(self):
# format data to ignore category ordering
def _format(data):
if not data:
return
if not isinstance(data, dict):
yield [], data
return
for criteria, val in data.items():
result = list(_format(val))
for criterias, pricing in result:
yield [criteria] + criterias, pricing
return {
self.format_pricing_data_key(criterias): pricing
for criterias, pricing in _format(self.pricing_data)
}
def format_pricing_data_key(self, values):
return '||'.join(sorted(values))
def compute_pricing(self, context):
criterias = {}
categories = []
# for each category
for category in self.pricing.categories.all():
criterias[category.slug] = None
categories.append(category.slug)
# find the first matching criteria (criterias are ordered)
for criteria in self.pricing.criterias.all():
if criteria.category_id != category.pk:
continue
if criteria.default:
continue
condition = criteria.compute_condition(context)
if condition:
criterias[category.slug] = criteria.slug
break
if criterias[category.slug] is not None:
continue
# if no match, take default criteria if only once defined
default_criterias = [
c for c in self.pricing.criterias.all() if c.default and c.category_id == category.pk
]
if len(default_criterias) > 1:
raise MultipleDefaultCriteriaCondition(details={'category': category.slug})
if not default_criterias:
raise CriteriaConditionNotFound(details={'category': category.slug})
criterias[category.slug] = default_criterias[0].slug
# now search for pricing values matching found criterias
pricing_data = self.format_pricing_data()
pricing = pricing_data.get(
self.format_pricing_data_key(['%s:%s' % (k, v) for k, v in criterias.items()])
)
if pricing is None:
raise PricingDataError(details={'criterias': criterias})
try:
pricing = decimal.Decimal(pricing)
except (decimal.InvalidOperation, ValueError, TypeError):
raise PricingDataFormatError(details={'pricing': pricing, 'wanted': 'decimal'})
return pricing, criterias
def get_booking_modifier(self, agenda, check_status):
status = check_status['status']
if status not in ['error', 'not-booked', 'cancelled', 'presence', 'absence']:
raise PricingUnknownCheckStatusError(details={'status': status})
if status == 'error':
reason = check_status['error_reason']
# event must be checked
if reason == 'event-not-checked':
raise PricingEventNotCheckedError
# booking must be checked
if reason == 'booking-not-checked':
raise PricingBookingNotCheckedError
# too many bookings found
if reason == 'too-many-bookings-found':
raise PricingMultipleBookingError
# no booking found
if status == 'not-booked':
return {
'status': 'not-booked',
'modifier_type': 'rate',
'modifier_rate': 0,
}
# booking cancelled
if status == 'cancelled':
return {
'status': 'cancelled',
'modifier_type': 'rate',
'modifier_rate': 0,
}
# no check_type, default rates
if not check_status['check_type']:
return {
'status': status,
'check_type_group': None,
'check_type': None,
'modifier_type': 'rate',
'modifier_rate': 100 if status == 'presence' else 0,
}
try:
check_type = CheckType.objects.get(
group=agenda.check_type_group_id, slug=check_status['check_type']
)
except CheckType.DoesNotExist:
raise PricingBookingCheckTypeError(
details={
'reason': 'not-found',
}
)
# check_type kind and user_was_present mismatch
if check_type.kind != status:
raise PricingBookingCheckTypeError(
details={
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'reason': 'wrong-kind',
}
)
# get pricing modifier
if check_type.pricing is not None:
return {
'status': status,
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'modifier_type': 'fixed',
'modifier_fixed': check_type.pricing,
}
if check_type.pricing_rate is not None:
return {
'status': status,
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'modifier_type': 'rate',
'modifier_rate': check_type.pricing_rate,
}
# pricing not found
raise PricingBookingCheckTypeError(
details={
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'reason': 'not-configured',
}
)
def iter_pricing_matrix(self):
categories = self.pricing.categories.all().order_by('pricingcriteriacategory__order')[:3]
pricing_data = self.format_pricing_data()
if not categories:
return
if len(categories) < 3:
yield self.get_pricing_matrix(
main_criteria=None, categories=categories, pricing_data=pricing_data
)
return
# criterias are ordered
for criteria in self.pricing.criterias.all():
if criteria.category != categories[0]:
continue
yield self.get_pricing_matrix(
main_criteria=criteria,
categories=categories[1:],
pricing_data=pricing_data,
)
def get_pricing_matrix(self, main_criteria, categories, pricing_data):
matrix = PricingMatrix(
criteria=main_criteria,
rows=[],
)
def get_pricing_matrix_cell(criteria_2, criteria_3):
criterias = [main_criteria, criteria_2, criteria_3]
key = self.format_pricing_data_key([c.identifier for c in criterias if c])
try:
value = decimal.Decimal(str(pricing_data.get(key)))
except (decimal.InvalidOperation, ValueError, TypeError):
value = None
return PricingMatrixCell(criteria=criteria_2, value=value)
if len(categories) < 2:
criterias_2 = [None]
criterias_3 = [c for c in self.pricing.criterias.all() if c.category == categories[0]]
else:
criterias_2 = [c for c in self.pricing.criterias.all() if c.category == categories[0]]
criterias_3 = [c for c in self.pricing.criterias.all() if c.category == categories[1]]
rows = [
PricingMatrixRow(
criteria=criteria_3,
cells=[
get_pricing_matrix_cell(
criteria_2,
criteria_3,
)
for criteria_2 in criterias_2
],
)
for criteria_3 in criterias_3
]
matrix.rows = rows
return matrix
class BillingDate(models.Model):
agenda_pricing = models.ForeignKey(AgendaPricing, on_delete=models.CASCADE, related_name='billingdates')
date_start = models.DateField(_('Billing start date'))
label = models.CharField(_('Label'), max_length=150)
def __str__(self):
return '%s (%s)' % (self.date_start.strftime('%d/%m/%Y'), self.label)
def export_json(self):
return {
'date_start': self.date_start.strftime('%Y-%m-%d'),
'label': self.label,
}
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
cls.objects.update_or_create(
agenda_pricing=data['agenda_pricing'], date_start=data['date_start'], defaults=data
)