491 lines
19 KiB
Python
491 lines
19 KiB
Python
# lingo - payment and billing system
|
|
# Copyright (C) 2022 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
|
|
from django import forms
|
|
from django.forms import ValidationError
|
|
from django.template import Template, TemplateSyntaxError
|
|
from django.utils.timezone import now
|
|
from django.utils.translation import pgettext_lazy
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
from lingo.agendas.chrono import ChronoError, get_event, get_subscriptions
|
|
from lingo.agendas.models import Agenda, CheckType
|
|
from lingo.pricing.models import AgendaPricing, BillingDate, Criteria, CriteriaCategory, PricingError
|
|
|
|
|
|
class ExportForm(forms.Form):
|
|
agendas = forms.BooleanField(label=_('Agendas'), required=False, initial=True)
|
|
check_type_groups = forms.BooleanField(label=_('Check type groups'), required=False, initial=True)
|
|
pricing_categories = forms.BooleanField(
|
|
label=_('Pricing criteria categories'), required=False, initial=True
|
|
)
|
|
pricing_models = forms.BooleanField(label=_('Pricing models'), required=False, initial=True)
|
|
pricings = forms.BooleanField(
|
|
label=pgettext_lazy('agenda pricing', 'Pricings'), required=False, initial=True
|
|
)
|
|
|
|
|
|
class ImportForm(forms.Form):
|
|
config_json = forms.FileField(label=_('Export File'))
|
|
|
|
|
|
class NewCriteriaForm(forms.ModelForm):
|
|
class Meta:
|
|
model = Criteria
|
|
fields = ['label', 'default', 'condition']
|
|
|
|
def clean(self):
|
|
cleaned_data = super().clean()
|
|
|
|
if cleaned_data.get('default') is True:
|
|
cleaned_data['condition'] = ''
|
|
else:
|
|
condition = cleaned_data['condition']
|
|
if not condition:
|
|
self.add_error('condition', self.fields['condition'].default_error_messages['required'])
|
|
else:
|
|
try:
|
|
Template('{%% if %s %%}OK{%% endif %%}' % condition)
|
|
except TemplateSyntaxError:
|
|
self.add_error('condition', _('Invalid syntax.'))
|
|
|
|
return cleaned_data
|
|
|
|
|
|
class CriteriaForm(NewCriteriaForm):
|
|
class Meta:
|
|
model = Criteria
|
|
fields = ['label', 'slug', 'default', 'condition']
|
|
|
|
def clean_slug(self):
|
|
slug = self.cleaned_data['slug']
|
|
|
|
if self.instance.category.criterias.filter(slug=slug).exclude(pk=self.instance.pk).exists():
|
|
raise ValidationError(_('Another criteria exists with the same identifier.'))
|
|
|
|
return slug
|
|
|
|
|
|
class PricingDuplicateForm(forms.Form):
|
|
label = forms.CharField(label=_('New label'), max_length=150, required=False)
|
|
|
|
|
|
class PricingVariableForm(forms.Form):
|
|
key = forms.CharField(label=_('Variable name'), required=False)
|
|
value = forms.CharField(
|
|
label=_('Value template'), widget=forms.TextInput(attrs={'size': 60}), required=False
|
|
)
|
|
|
|
|
|
PricingVariableFormSet = forms.formset_factory(PricingVariableForm)
|
|
|
|
|
|
class PricingCriteriaCategoryAddForm(forms.Form):
|
|
category = forms.ModelChoiceField(
|
|
label=_('Criteria category to add'), queryset=CriteriaCategory.objects.none(), required=True
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.pricing = kwargs.pop('pricing')
|
|
super().__init__(*args, **kwargs)
|
|
self.fields['category'].queryset = CriteriaCategory.objects.exclude(pricings=self.pricing)
|
|
|
|
|
|
class PricingCriteriaCategoryEditForm(forms.Form):
|
|
criterias = forms.ModelMultipleChoiceField(
|
|
label=_('Criterias'),
|
|
queryset=Criteria.objects.none(),
|
|
required=True,
|
|
widget=forms.CheckboxSelectMultiple(),
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.pricing = kwargs.pop('pricing')
|
|
self.category = kwargs.pop('category')
|
|
super().__init__(*args, **kwargs)
|
|
self.fields['criterias'].queryset = self.category.criterias.all()
|
|
self.initial['criterias'] = self.pricing.criterias.filter(category=self.category)
|
|
|
|
|
|
class NewAgendaPricingForm(forms.ModelForm):
|
|
class Meta:
|
|
model = AgendaPricing
|
|
fields = ['label', 'pricing', 'date_start', 'date_end', 'flat_fee_schedule', 'subscription_required']
|
|
widgets = {
|
|
'date_start': forms.DateInput(attrs={'type': 'date'}, format='%Y-%m-%d'),
|
|
'date_end': forms.DateInput(attrs={'type': 'date'}, format='%Y-%m-%d'),
|
|
}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
if not self.instance.pk:
|
|
last_date_end = (
|
|
AgendaPricing.objects.all().order_by('date_end').values_list('date_end', flat=True).last()
|
|
)
|
|
self.initial['date_start'] = last_date_end or now().date()
|
|
|
|
def clean(self):
|
|
cleaned_data = super().clean()
|
|
|
|
if 'date_start' in cleaned_data and 'date_end' in cleaned_data:
|
|
if cleaned_data['date_end'] <= cleaned_data['date_start']:
|
|
self.add_error('date_end', _('End date must be greater than start date.'))
|
|
if 'flat_fee_schedule' in cleaned_data and 'subscription_required' in cleaned_data:
|
|
if not cleaned_data['flat_fee_schedule']:
|
|
cleaned_data['subscription_required'] = True
|
|
|
|
return cleaned_data
|
|
|
|
|
|
class AgendaPricingForm(NewAgendaPricingForm):
|
|
class Meta:
|
|
model = AgendaPricing
|
|
fields = [
|
|
'label',
|
|
'slug',
|
|
'pricing',
|
|
'date_start',
|
|
'date_end',
|
|
'flat_fee_schedule',
|
|
'subscription_required',
|
|
]
|
|
widgets = {
|
|
'date_start': forms.DateInput(attrs={'type': 'date'}, format='%Y-%m-%d'),
|
|
'date_end': forms.DateInput(attrs={'type': 'date'}, format='%Y-%m-%d'),
|
|
}
|
|
|
|
def clean_slug(self):
|
|
slug = self.cleaned_data['slug']
|
|
|
|
if AgendaPricing.objects.filter(slug=slug).exclude(pk=self.instance.pk).exists():
|
|
raise ValidationError(_('Another pricing exists with the same identifier.'))
|
|
|
|
return slug
|
|
|
|
def clean_subscription_required(self):
|
|
subscription_required = self.cleaned_data['subscription_required']
|
|
if subscription_required is False and self.instance.subscription_required is True:
|
|
# value has changed, check linked agendas
|
|
if self.instance.agendas.exists():
|
|
raise forms.ValidationError(
|
|
_('Some agendas are linked to this pricing; please unlink them first.')
|
|
)
|
|
return subscription_required
|
|
|
|
def clean(self):
|
|
cleaned_data = super().clean()
|
|
if (
|
|
'date_start' in cleaned_data
|
|
and 'date_end' in cleaned_data
|
|
and 'flat_fee_schedule' in cleaned_data
|
|
):
|
|
old_date_start = self.instance.date_start
|
|
old_date_end = self.instance.date_end
|
|
old_flat_fee_schedule = self.instance.flat_fee_schedule
|
|
new_date_start = cleaned_data['date_start']
|
|
new_date_end = cleaned_data['date_end']
|
|
new_flat_fee_schedule = cleaned_data['flat_fee_schedule']
|
|
if (
|
|
old_date_start != new_date_start
|
|
or old_date_end != new_date_end
|
|
or old_flat_fee_schedule != new_flat_fee_schedule
|
|
):
|
|
overlapping_qs = (
|
|
AgendaPricing.objects.filter(flat_fee_schedule=new_flat_fee_schedule)
|
|
.exclude(pk=self.instance.pk)
|
|
.extra(
|
|
where=["(date_start, date_end) OVERLAPS (%s, %s)"],
|
|
params=[new_date_start, new_date_end],
|
|
)
|
|
)
|
|
for agenda in self.instance.agendas.all():
|
|
if overlapping_qs.filter(agendas=agenda).exists():
|
|
self.add_error(
|
|
None,
|
|
_('Agenda "%s" has already a pricing overlapping this period.') % agenda.label,
|
|
)
|
|
if (
|
|
old_flat_fee_schedule != new_flat_fee_schedule
|
|
and new_flat_fee_schedule is False
|
|
and self.instance.billingdates.exists()
|
|
):
|
|
self.add_error(
|
|
'flat_fee_schedule',
|
|
_('Some billing dates are are defined for this pricing; please delete them first.'),
|
|
)
|
|
if (
|
|
old_flat_fee_schedule == new_flat_fee_schedule
|
|
and new_flat_fee_schedule is True
|
|
and (old_date_start != new_date_start or old_date_end != new_date_end)
|
|
):
|
|
if (
|
|
self.instance.billingdates.filter(date_start__lt=new_date_start).exists()
|
|
or self.instance.billingdates.filter(date_start__gte=new_date_end).exists()
|
|
):
|
|
self.add_error(None, _('Some billing dates are outside the pricing period.'))
|
|
|
|
return cleaned_data
|
|
|
|
|
|
class AgendaPricingAgendaAddForm(forms.Form):
|
|
agenda = forms.ModelChoiceField(label=_('Agenda to add'), queryset=Agenda.objects.none(), required=True)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.agenda_pricing = kwargs.pop('agenda_pricing')
|
|
super().__init__(*args, **kwargs)
|
|
self.fields['agenda'].queryset = Agenda.objects.exclude(agendapricings=self.agenda_pricing)
|
|
|
|
def clean_agenda(self):
|
|
agenda = self.cleaned_data['agenda']
|
|
overlapping_qs = AgendaPricing.objects.filter(
|
|
flat_fee_schedule=self.agenda_pricing.flat_fee_schedule, agendas=agenda
|
|
).extra(
|
|
where=["(date_start, date_end) OVERLAPS (%s, %s)"],
|
|
params=[self.agenda_pricing.date_start, self.agenda_pricing.date_end],
|
|
)
|
|
if overlapping_qs.exists():
|
|
raise forms.ValidationError(_('This agendas has already a pricing overlapping this period.'))
|
|
return agenda
|
|
|
|
|
|
class AgendaPricingBillingDateForm(forms.ModelForm):
|
|
class Meta:
|
|
model = BillingDate
|
|
fields = ['date_start', 'label']
|
|
widgets = {
|
|
'date_start': forms.DateInput(attrs={'type': 'date'}, format='%Y-%m-%d'),
|
|
}
|
|
|
|
def clean_date_start(self):
|
|
date_start = self.cleaned_data['date_start']
|
|
if (
|
|
date_start < self.instance.agenda_pricing.date_start
|
|
or date_start >= self.instance.agenda_pricing.date_end
|
|
):
|
|
raise forms.ValidationError(_('The billing start date must be within the period of the pricing.'))
|
|
return date_start
|
|
|
|
|
|
class PricingMatrixForm(forms.Form):
|
|
def __init__(self, *args, **kwargs):
|
|
matrix = kwargs.pop('matrix')
|
|
super().__init__(*args, **kwargs)
|
|
for i in range(len(matrix.rows[0].cells)):
|
|
self.fields['crit_%i' % i] = forms.DecimalField(required=True, max_digits=5, decimal_places=2)
|
|
|
|
|
|
class PricingTestToolForm(forms.Form):
|
|
agenda = forms.ModelChoiceField(label=_('Agenda'), empty_label=None, queryset=Agenda.objects.none())
|
|
event_slug = forms.CharField(label=_('Event identifier'))
|
|
billing_date = forms.ModelChoiceField(
|
|
label=_('Billing date'), empty_label=None, queryset=BillingDate.objects.none()
|
|
)
|
|
user_external_id = forms.CharField(label=_('User external identifier'))
|
|
adult_external_id = forms.CharField(label=_('Adult external identifier'))
|
|
booking_status = forms.ChoiceField(label=_('Booking status'), choices=[])
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.request = kwargs.pop('request')
|
|
self.agenda_pricing = kwargs.pop('agenda_pricing')
|
|
self.agenda = None
|
|
if kwargs['data'] and kwargs['data'].get('agenda'):
|
|
self.init_agenda(kwargs['data']['agenda'])
|
|
self.serialized_event = None
|
|
self.serialized_subscription = None
|
|
self.check_type_slug = None
|
|
self.booking_status = None
|
|
super().__init__(*args, **kwargs)
|
|
if self.agenda_pricing.subscription_required:
|
|
self.fields['agenda'].queryset = self.agenda_pricing.agendas.all()
|
|
else:
|
|
del self.fields['agenda']
|
|
if self.agenda_pricing.flat_fee_schedule:
|
|
del self.fields['event_slug']
|
|
del self.fields['booking_status']
|
|
self.init_billing_date()
|
|
else:
|
|
del self.fields['billing_date']
|
|
self.init_booking_status()
|
|
|
|
def init_agenda(self, agenda_id):
|
|
try:
|
|
self.agenda = self.agenda_pricing.agendas.get(pk=agenda_id)
|
|
except Agenda.DoesNotExist:
|
|
pass
|
|
|
|
def init_booking_status(self):
|
|
presence_check_types = (
|
|
self.agenda.check_type_group.check_types.presences()
|
|
if self.agenda and self.agenda.check_type_group
|
|
else []
|
|
)
|
|
absence_check_types = (
|
|
self.agenda.check_type_group.check_types.absences()
|
|
if self.agenda and self.agenda.check_type_group
|
|
else []
|
|
)
|
|
status_choices = [
|
|
('presence', _('Presence')),
|
|
]
|
|
status_choices += [
|
|
('presence::%s' % ct.slug, _('Presence (%s)') % ct.label) for ct in presence_check_types
|
|
]
|
|
status_choices += [('absence', _('Absence'))]
|
|
status_choices += [
|
|
('absence::%s' % ct.slug, _('Absence (%s)') % ct.label) for ct in absence_check_types
|
|
]
|
|
self.fields['booking_status'].choices = status_choices
|
|
|
|
def init_billing_date(self):
|
|
billing_dates = self.agenda_pricing.billingdates.order_by('date_start')
|
|
if not billing_dates:
|
|
del self.fields['billing_date']
|
|
return
|
|
self.fields['billing_date'].queryset = billing_dates
|
|
|
|
def clean_event_slug(self):
|
|
event_slug = self.cleaned_data['event_slug']
|
|
if not self.agenda:
|
|
return event_slug
|
|
try:
|
|
self.serialized_event = get_event('%s@%s' % (self.agenda.slug, event_slug))
|
|
except ChronoError as e:
|
|
raise forms.ValidationError(e)
|
|
|
|
event_date = datetime.datetime.fromisoformat(self.serialized_event['start_datetime']).date()
|
|
if event_date < self.agenda_pricing.date_start or event_date >= self.agenda_pricing.date_end:
|
|
raise ValidationError(_('This event takes place outside the period covered by this pricing'))
|
|
|
|
return event_slug
|
|
|
|
def clean_booking_status(self):
|
|
original_booking_status = self.cleaned_data['booking_status']
|
|
self.booking_status = original_booking_status
|
|
if '::' in original_booking_status:
|
|
# split value to get booking status and selected check_type
|
|
self.booking_status, self.check_type_slug = original_booking_status.split('::')
|
|
return original_booking_status
|
|
|
|
def get_subscription(self, user_external_id, start_date, end_date):
|
|
try:
|
|
subscriptions = get_subscriptions(self.agenda.slug, user_external_id)
|
|
except ChronoError as e:
|
|
self.add_error('user_external_id', str(e))
|
|
return
|
|
for subscription in subscriptions:
|
|
sub_start_date = datetime.date.fromisoformat(subscription['date_start'])
|
|
sub_end_date = datetime.date.fromisoformat(subscription['date_end'])
|
|
if sub_start_date >= end_date:
|
|
continue
|
|
if sub_end_date <= start_date:
|
|
continue
|
|
return subscription
|
|
error_message = _('No subscription found for this event')
|
|
if self.agenda_pricing.flat_fee_schedule:
|
|
error_message = _('No subscription found for this period')
|
|
self.add_error('user_external_id', error_message)
|
|
|
|
def clean(self):
|
|
super().clean()
|
|
|
|
user_external_id = self.cleaned_data.get('user_external_id')
|
|
start_date = None
|
|
end_date = None
|
|
if self.agenda_pricing.flat_fee_schedule and self.agenda_pricing.subscription_required:
|
|
if self.cleaned_data.get('billing_date'):
|
|
start_date = self.cleaned_data['billing_date'].date_start
|
|
next_billing_date = (
|
|
self.fields['billing_date'].queryset.filter(date_start__gt=start_date).first()
|
|
)
|
|
end_date = next_billing_date.date_start if next_billing_date else self.agenda_pricing.date_end
|
|
else:
|
|
start_date = self.agenda_pricing.date_start
|
|
end_date = self.agenda_pricing.date_end
|
|
elif self.serialized_event:
|
|
start_date = datetime.datetime.fromisoformat(self.serialized_event['start_datetime']).date()
|
|
end_date = start_date + datetime.timedelta(days=1)
|
|
if user_external_id and start_date and end_date:
|
|
self.serialized_subscription = self.get_subscription(user_external_id, start_date, end_date)
|
|
|
|
def compute(self):
|
|
try:
|
|
if self.agenda_pricing.flat_fee_schedule:
|
|
return self.compute_for_flat_fee_schedule()
|
|
return self.compute_for_event()
|
|
except PricingError as e:
|
|
return {
|
|
'error': type(e),
|
|
'error_details': e.details,
|
|
}
|
|
|
|
def compute_for_flat_fee_schedule(self):
|
|
if self.agenda_pricing.subscription_required and not self.serialized_subscription:
|
|
return
|
|
pricing_date = self.agenda_pricing.date_start
|
|
if self.cleaned_data.get('billing_date'):
|
|
pricing_date = self.cleaned_data['billing_date'].date_start
|
|
return self.agenda_pricing.get_pricing_data(
|
|
request=self.request,
|
|
pricing_date=pricing_date,
|
|
subscription=self.serialized_subscription,
|
|
user_external_id=self.cleaned_data['user_external_id'],
|
|
adult_external_id=self.cleaned_data['adult_external_id'],
|
|
)
|
|
|
|
def compute_for_event(self):
|
|
if not self.serialized_event or not self.serialized_subscription:
|
|
return
|
|
return self.agenda_pricing.get_pricing_data_for_event(
|
|
request=self.request,
|
|
agenda=self.agenda,
|
|
event=self.serialized_event,
|
|
subscription=self.serialized_subscription,
|
|
check_status={
|
|
'status': self.booking_status,
|
|
'check_type': self.check_type_slug,
|
|
},
|
|
booking={},
|
|
user_external_id=self.cleaned_data['user_external_id'],
|
|
adult_external_id=self.cleaned_data['adult_external_id'],
|
|
)
|
|
|
|
|
|
class NewCheckTypeForm(forms.ModelForm):
|
|
class Meta:
|
|
model = CheckType
|
|
fields = ['label', 'kind', 'pricing', 'pricing_rate']
|
|
|
|
def clean(self):
|
|
super().clean()
|
|
if self.cleaned_data.get('pricing') is not None and self.cleaned_data.get('pricing_rate') is not None:
|
|
raise ValidationError(_('Please choose between pricing and pricing rate.'))
|
|
|
|
|
|
class CheckTypeForm(NewCheckTypeForm):
|
|
class Meta:
|
|
model = CheckType
|
|
fields = ['label', 'slug', 'pricing', 'pricing_rate', 'disabled']
|
|
|
|
def clean_slug(self):
|
|
slug = self.cleaned_data['slug']
|
|
|
|
if self.instance.group.check_types.filter(slug=slug).exclude(pk=self.instance.pk).exists():
|
|
raise ValidationError(_('Another check type exists with the same identifier.'))
|
|
|
|
return slug
|