chrono/chrono/pricing/models.py

566 lines
19 KiB
Python

# chrono - agendas system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import dataclasses
import decimal
from django.contrib.postgres.fields import JSONField
from django.db import models
from django.template import Context, RequestContext, Template, TemplateSyntaxError
from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _
from chrono.agendas.models import Agenda, Booking, Subscription
from chrono.utils.misc import AgendaImportError, clean_import_data, generate_slug
class PricingError(Exception):
def __init__(self, details=None):
self.details = details or {}
super().__init__()
class AgendaPricingNotFound(PricingError):
pass
class CriteriaConditionNotFound(PricingError):
pass
class PricingDataError(PricingError):
pass
class PricingDataFormatError(PricingError):
pass
class PricingEventNotCheckedError(PricingError):
pass
class PricingBookingNotCheckedError(PricingError):
pass
class PricingSubscriptionError(PricingError):
pass
class PricingMultipleBookingError(PricingError):
pass
class PricingBookingCheckTypeError(PricingError):
pass
class CriteriaCategory(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data, overwrite=False):
criterias = data.pop('criterias', [])
data = clean_import_data(cls, data)
category, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite:
Criteria.objects.filter(category=category).delete()
for criteria in criterias:
criteria['category'] = category
Criteria.import_json(criteria)
return created, category
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'criterias': [a.export_json() for a in self.criterias.all()],
}
class Criteria(models.Model):
category = models.ForeignKey(
CriteriaCategory, verbose_name=_('Category'), on_delete=models.CASCADE, related_name='criterias'
)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
condition = models.CharField(_('Condition'), max_length=1000)
order = models.PositiveIntegerField()
class Meta:
ordering = ['order']
unique_together = ['category', 'slug']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if self.order is None:
max_order = (
Criteria.objects.filter(category=self.category)
.aggregate(models.Max('order'))
.get('order__max')
or 0
)
self.order = max_order + 1
if not self.slug:
self.slug = generate_slug(self, category=self.category)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
cls.objects.update_or_create(slug=data['slug'], category=data['category'], defaults=data)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'condition': self.condition,
'order': self.order,
}
@property
def identifier(self):
return '%s:%s' % (self.category.slug, self.slug)
def compute_condition(self, context):
try:
template = Template('{%% if %s %%}OK{%% endif %%}' % self.condition)
except TemplateSyntaxError:
return False
return template.render(Context(context)) == 'OK'
class Pricing(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
categories = models.ManyToManyField(
CriteriaCategory,
related_name='pricings',
through='PricingCriteriaCategory',
)
criterias = models.ManyToManyField(Criteria)
extra_variables = JSONField(blank=True, default=dict)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
def get_extra_variables(self, request, original_context):
result = {}
context = RequestContext(request)
context.push(original_context)
for key, tplt in (self.extra_variables or {}).items():
try:
template = Template(tplt)
except TemplateSyntaxError:
continue
result[key] = template.render(context)
return result
def get_extra_variables_keys(self):
return sorted((self.extra_variables or {}).keys())
@classmethod
def import_json(cls, data, overwrite=False):
data = data.copy()
categories = data.pop('categories', [])
categories_by_slug = {c.slug: c for c in CriteriaCategory.objects.all()}
criterias_by_categories_and_slug = {
(crit.category.slug, crit.slug): crit
for crit in Criteria.objects.select_related('category').all()
}
for category_data in categories:
category_slug = category_data['category']
if category_data['category'] not in categories_by_slug:
raise AgendaImportError(_('Missing "%s" pricing category') % category_data['category'])
for criteria_slug in category_data['criterias']:
if (category_slug, criteria_slug) not in criterias_by_categories_and_slug:
raise AgendaImportError(
_('Missing "%s" pricing criteria for "%s" category') % (criteria_slug, category_slug)
)
data = clean_import_data(cls, data)
pricing, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
PricingCriteriaCategory.objects.filter(pricing=pricing).delete()
criterias = []
for category_data in categories:
pricing.categories.add(
categories_by_slug[category_data['category']],
through_defaults={'order': category_data['order']},
)
for criteria_slug in category_data['criterias']:
criterias.append(criterias_by_categories_and_slug[(category_data['category'], criteria_slug)])
pricing.criterias.set(criterias)
return created, pricing
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'extra_variables': self.extra_variables,
'categories': [pcc.export_json() for pcc in PricingCriteriaCategory.objects.filter(pricing=self)],
}
class PricingCriteriaCategory(models.Model):
pricing = models.ForeignKey(Pricing, on_delete=models.CASCADE)
category = models.ForeignKey(CriteriaCategory, on_delete=models.CASCADE)
order = models.PositiveIntegerField()
class Meta:
ordering = ['order']
unique_together = ['pricing', 'category']
def save(self, *args, **kwargs):
if self.order is None:
max_order = (
PricingCriteriaCategory.objects.filter(pricing=self.pricing)
.aggregate(models.Max('order'))
.get('order__max')
or 0
)
self.order = max_order + 1
super().save(*args, **kwargs)
def export_json(self):
return {
'category': self.category.slug,
'order': self.order,
'criterias': [c.slug for c in self.pricing.criterias.all() if c.category == self.category],
}
@dataclasses.dataclass
class PricingMatrixCell:
criteria: Criteria
value: decimal.Decimal
@dataclasses.dataclass
class PricingMatrixRow:
criteria: Criteria
cells: list[PricingMatrixCell]
@dataclasses.dataclass
class PricingMatrix:
criteria: Criteria
rows: list[PricingMatrixRow]
class AgendaPricing(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
pricing = models.ForeignKey(Pricing, on_delete=models.CASCADE)
date_start = models.DateField()
date_end = models.DateField()
pricing_data = JSONField(null=True)
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
cls.objects.update_or_create(
agenda=data['agenda'],
pricing=data['pricing'],
date_start=data['date_start'],
date_end=data['date_end'],
defaults=data,
)
def export_json(self):
return {
'pricing': self.pricing.slug,
'date_start': self.date_start.strftime('%Y-%m-%d'),
'date_end': self.date_end.strftime('%Y-%m-%d'),
'pricing_data': self.pricing_data,
}
@staticmethod
def get_pricing_data(request, event, user_external_id, adult_external_id):
agenda_pricing = AgendaPricing.get_agenda_pricing(event)
context = agenda_pricing.get_pricing_context(request, user_external_id, adult_external_id)
pricing, criterias = agenda_pricing.compute_pricing(context)
modifier = agenda_pricing.get_booking_modifier(event, user_external_id)
return agenda_pricing.aggregate_pricing_data(pricing, criterias, context, modifier)
def aggregate_pricing_data(self, pricing, criterias, context, modifier):
if modifier['modifier_type'] == 'fixed':
pricing_amount = modifier['modifier_fixed']
else:
pricing_amount = pricing * modifier['modifier_rate'] / 100
return {
'pricing': pricing_amount,
'calculation_details': {
'pricing': pricing,
'criterias': criterias,
'context': context,
},
'booking_details': modifier,
}
@staticmethod
def get_agenda_pricing(event):
agenda = event.agenda
try:
return agenda.agendapricing_set.get(
date_start__lte=event.start_datetime,
date_end__gt=event.start_datetime,
)
except (AgendaPricing.DoesNotExist, AgendaPricing.MultipleObjectsReturned):
raise AgendaPricingNotFound
def get_subscription(self, event, user_external_id):
try:
return self.agenda.subscriptions.get(
user_external_id=user_external_id,
date_start__lte=event.start_datetime,
date_end__gt=event.start_datetime,
)
except (Subscription.DoesNotExist, Subscription.MultipleObjectsReturned):
raise PricingSubscriptionError
def get_pricing_context(self, request, user_external_id, adult_external_id):
context = {'user_external_id': user_external_id, 'adult_external_id': adult_external_id}
if ':' in user_external_id:
context['user_external_raw_id'] = user_external_id.split(':')[1]
if ':' in adult_external_id:
context['adult_external_raw_id'] = adult_external_id.split(':')[1]
return self.pricing.get_extra_variables(request, context)
def compute_pricing(self, context):
criterias = {}
categories = []
# for each category (ordered)
for category in self.pricing.categories.all().order_by('pricingcriteriacategory__order'):
criterias[category.slug] = None
categories.append(category.slug)
# find the first matching criteria (criterias are ordered)
for criteria in self.pricing.criterias.filter(category=category):
condition = criteria.compute_condition(context)
if condition:
criterias[category.slug] = criteria.slug
break
# now search for pricing values matching found criterias
pricing_data = self.pricing_data
# for each category (ordered)
for category in categories:
criteria = criterias[category]
if criteria is None:
raise CriteriaConditionNotFound(details={'category': category})
if not isinstance(pricing_data, dict):
raise PricingDataFormatError(
details={'category': category, 'pricing': pricing_data, 'wanted': 'dict'}
)
key = '%s:%s' % (category, criteria)
if key not in pricing_data:
raise PricingDataError(details={'category': category, 'criteria': criteria})
pricing_data = pricing_data[key]
try:
pricing = decimal.Decimal(pricing_data)
except (decimal.InvalidOperation, ValueError, TypeError):
raise PricingDataFormatError(details={'pricing': pricing_data, 'wanted': 'decimal'})
return pricing, criterias
def get_booking_modifier(self, event, user_external_id):
# event must be checked
if event.checked is False:
raise PricingEventNotCheckedError
# search for an available subscription
self.get_subscription(event, user_external_id)
# search for a booking
try:
booking = event.booking_set.get(user_external_id=user_external_id)
except Booking.DoesNotExist:
# no booking
return {
'status': 'not-booked',
'modifier_type': 'rate',
'modifier_rate': 0,
}
except Booking.MultipleObjectsReturned:
raise PricingMultipleBookingError
# booking cancelled
if booking.cancellation_datetime is not None:
return {
'status': 'cancelled',
'modifier_type': 'rate',
'modifier_rate': 0,
}
# booking not cancelled, is must be checked
if booking.user_was_present is None:
raise PricingBookingNotCheckedError
status = 'presence' if booking.user_was_present else 'absence'
# no check_type, default rates
if booking.user_check_type is None:
return {
'status': status,
'check_type_group': None,
'check_type': None,
'modifier_type': 'rate',
'modifier_rate': 100 if booking.user_was_present else 0,
}
check_type = booking.user_check_type
kind_mapping = {'presence': True, 'absence': False}
# check_type kind and user_was_present mismatch
if kind_mapping[check_type.kind] != booking.user_was_present:
raise PricingBookingCheckTypeError(
details={
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'reason': 'wrong-kind',
}
)
# get pricing modifier
if check_type.pricing is not None:
return {
'status': status,
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'modifier_type': 'fixed',
'modifier_fixed': check_type.pricing,
}
if check_type.pricing_rate is not None:
return {
'status': status,
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'modifier_type': 'rate',
'modifier_rate': check_type.pricing_rate,
}
# pricing not found
raise PricingBookingCheckTypeError(
details={
'check_type_group': check_type.group.slug,
'check_type': check_type.slug,
'reason': 'not-configured',
}
)
def iter_pricing_matrix(self):
categories = self.pricing.categories.all().order_by('pricingcriteriacategory__order')[:3]
pricing_data = self.pricing_data or {}
if not categories:
return
if len(categories) < 3:
yield self.get_pricing_matrix(
main_criteria=None, categories=categories, pricing_data=pricing_data
)
return
# criterias are ordered
for criteria in self.pricing.criterias.all():
if criteria.category != categories[0]:
continue
yield self.get_pricing_matrix(
main_criteria=criteria,
categories=categories[1:],
pricing_data=pricing_data.get(criteria.identifier) or {},
)
def get_pricing_matrix(self, main_criteria, categories, pricing_data):
matrix = PricingMatrix(
criteria=main_criteria,
rows=[],
)
def get_pricing_matrix_cell(criteria_2, criteria_3, _pricing_data):
try:
value = decimal.Decimal(str(_pricing_data.get(criteria_3.identifier)))
except (decimal.InvalidOperation, ValueError, TypeError):
value = None
return PricingMatrixCell(criteria=criteria_2, value=value)
if len(categories) < 2:
rows = [
PricingMatrixRow(
criteria=criteria,
cells=[get_pricing_matrix_cell(None, criteria, pricing_data)],
)
for criteria in self.pricing.criterias.all()
if criteria.category == categories[0]
]
matrix.rows = rows
else:
criterias_2 = [c for c in self.pricing.criterias.all() if c.category == categories[0]]
criterias_3 = [c for c in self.pricing.criterias.all() if c.category == categories[1]]
rows = [
PricingMatrixRow(
criteria=criteria_3,
cells=[
get_pricing_matrix_cell(
criteria_2,
criteria_3,
pricing_data.get(criteria_2.identifier) or {},
)
for criteria_2 in criterias_2
],
)
for criteria_3 in criterias_3
]
matrix.rows = rows
return matrix