lingo/lingo/invoicing/models.py

1566 lines
56 KiB
Python

# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import collections
import copy
import dataclasses
import datetime
import decimal
import sys
import traceback
import uuid
from django.conf import settings
from django.contrib.auth.models import Group
from django.core import validators
from django.core.serializers.json import DjangoJSONEncoder
from django.db import connection, models, transaction
from django.template import RequestContext, Template, TemplateSyntaxError, VariableDoesNotExist
from django.template.defaultfilters import floatformat
from django.template.loader import get_template
from django.utils.encoding import force_str
from django.utils.formats import date_format
from django.utils.text import slugify
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from lingo.agendas.chrono import ChronoError, lock_events_check
from lingo.agendas.models import Agenda
from lingo.export_import.models import WithApplicationMixin
from lingo.utils.fields import RichTextField
from lingo.utils.misc import LingoImportError, clean_import_data, generate_slug
from lingo.utils.wcs import (
WCSError,
get_wcs_dependencies_from_template,
get_wcs_json,
get_wcs_matching_card_model,
get_wcs_services,
)
class RegieImportError(Exception):
pass
class PoolPromotionError(Exception):
def __init__(self, msg):
self.msg = msg
class InvoicingError(Exception):
def __init__(self, details=None):
self.details = details or {}
super().__init__()
class PayerError(InvoicingError):
pass
class PayerDataError(InvoicingError):
pass
class Payer(WithApplicationMixin, models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
description = models.TextField(_('Description'), null=True, blank=True)
carddef_reference = models.CharField(_('Card Model'), max_length=150)
cached_carddef_json = models.JSONField(blank=True, default=dict)
payer_external_id_prefix = models.CharField(
_('Prefix for payer external id'),
max_length=250,
blank=True,
)
payer_external_id_template = models.CharField(
_('Template for payer external id'),
max_length=1000,
help_text=_('To get payer external id from user external id'),
blank=True,
)
payer_external_id_from_nameid_template = models.CharField(
_('Template for payer external id from nameid'),
max_length=1000,
help_text='{{ cards|objects:"adults"|filter_by_user:nameid|first|get:"id"|default:"" }}',
blank=True,
)
user_fields_mapping = models.JSONField(blank=True, default=dict)
application_component_type = 'payers'
application_label_singular = _('Payer')
application_label_plural = _('Payers')
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
if 'update_fields' in kwargs:
# don't populate the cache
return
def populate_cache():
if self.carddef_reference:
parts = self.carddef_reference.split(':')
wcs_key, card_slug = parts[:2]
wcs_site = get_wcs_services().get(wcs_key)
try:
card_schema = get_wcs_json(
wcs_site, 'api/cards/%s/@schema' % card_slug, log_errors='warn'
)
except WCSError:
return
if not card_schema:
return
self.cached_carddef_json = card_schema
self.save(update_fields=['cached_carddef_json'])
populate_cache()
@property
def base_slug(self):
return slugify(self.label)
def get_dependencies(self):
if self.carddef_reference:
parts = self.carddef_reference.split(':')
wcs_key, card_slug = parts[:2]
wcs_site_url = get_wcs_services().get(wcs_key)['url']
urls = {
'export': f'{wcs_site_url}api/export-import/cards/{card_slug}/',
'dependencies': f'{wcs_site_url}api/export-import/cards/{card_slug}/dependencies/',
'redirect': f'{wcs_site_url}api/export-import/cards/{card_slug}/redirect/',
}
yield {
'type': 'cards',
'id': card_slug,
'text': self.cached_carddef_json.get('name'),
'urls': urls,
}
yield from get_wcs_dependencies_from_template(self.payer_external_id_template)
yield from get_wcs_dependencies_from_template(self.payer_external_id_from_nameid_template)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'description': self.description,
'carddef_reference': self.carddef_reference,
'payer_external_id_prefix': self.payer_external_id_prefix,
'payer_external_id_template': self.payer_external_id_template,
'user_fields_mapping': self.user_fields_mapping,
}
@classmethod
def import_json(cls, data):
data = copy.deepcopy(data)
data = clean_import_data(cls, data)
payer, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
return created, payer
@property
def carddef_name(self):
if not self.carddef_reference:
return
result = get_wcs_matching_card_model(self.carddef_reference)
if not result:
return
return result
@property
def carddef_fields(self):
if not self.cached_carddef_json:
return
return {
f['varname']: f['label']
for f in self.cached_carddef_json.get('fields')
if f.get('varname') and f['type'] != 'page'
}
@property
def user_variables(self):
return [
('first_name', _('First name')),
('last_name', _('Last name')),
('address', _('Address')),
('demat', _('Demat')),
('direct_debit', _('Direct debit')),
]
@property
def user_fields(self):
result = []
for key, label in self.user_variables:
value = ''
if self.user_fields_mapping.get(key):
varname = self.user_fields_mapping.get(key)
value = self.carddef_fields.get(varname) or ''
result.append((label, value))
return result
def _get_payer_external_id(self, request, original_context, template_key):
context = RequestContext(request)
context.push(original_context)
tplt = getattr(self, template_key) or ''
if not tplt:
raise PayerError(details={'reason': 'empty-template'})
try:
value = Template(tplt).render(context)
if not value:
raise PayerError(details={'reason': 'empty-result'})
return '%s%s' % (self.payer_external_id_prefix, value)
except TemplateSyntaxError:
raise PayerError(details={'reason': 'syntax-error'})
except VariableDoesNotExist:
raise PayerError(details={'reason': 'variable-error'})
def get_payer_external_id(self, request, original_context):
return self._get_payer_external_id(
request=request,
original_context=original_context,
template_key='payer_external_id_template',
)
def get_payer_external_id_from_nameid(self, request, original_context):
return self._get_payer_external_id(
request=request,
original_context=original_context,
template_key='payer_external_id_from_nameid_template',
)
def get_payer_data(self, request, payer_external_id):
if not self.carddef_reference:
raise PayerError(details={'reason': 'missing-card-model'})
result = {}
context = RequestContext(request)
payer_external_raw_id = None
if ':' in payer_external_id:
payer_external_raw_id = payer_external_id.split(':')[1]
context.push({'payer_external_id': payer_external_raw_id or payer_external_id})
bool_keys = ['demat', 'direct_debit']
for key, dummy in self.user_variables:
if not self.user_fields_mapping.get(key):
if key not in bool_keys:
raise PayerDataError(details={'key': key, 'reason': 'not-defined'})
tplt = 'False'
else:
tplt = (
'{{ cards|objects:"%s"|filter_by_internal_id:payer_external_id|include_fields|first|get:"fields"|get:"%s"|default:"" }}'
% (
self.carddef_reference.split(':')[1],
self.user_fields_mapping[key],
)
)
value = Template(tplt).render(context)
if not value:
if key not in bool_keys:
raise PayerDataError(details={'key': key, 'reason': 'empty-result'})
value = False
if key in bool_keys:
if value in ('True', 'true', '1'):
value = True
elif value in ('False', 'false', '0'):
value = False
if not isinstance(value, bool):
raise PayerDataError(details={'key': key, 'reason': 'not-a-boolean'})
result[key] = value
return result
INVOICE_MODELS = [
('basic', _('Basic')),
('middle', _('Middle')),
('full', _('Full')),
]
class Regie(WithApplicationMixin, models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
description = models.TextField(
_('Description'), null=True, blank=True, help_text=_('Optional regie description.')
)
cashier_role = models.ForeignKey(
Group,
blank=True,
null=True,
default=None,
related_name='+',
verbose_name=_('Cashier Role'),
on_delete=models.SET_NULL,
)
payer = models.ForeignKey(Payer, on_delete=models.PROTECT, blank=True, null=True)
counter_name = models.CharField(
_('Counter name'),
default='{yy}',
max_length=50,
)
invoice_number_format = models.CharField(
_('Invoice number format'),
default='F{regie_id:02d}-{yy}-{mm}-{number:07d}',
max_length=100,
)
payment_number_format = models.CharField(
_('Payment number format'),
default='R{regie_id:02d}-{yy}-{mm}-{number:07d}',
max_length=100,
)
credit_number_format = models.CharField(
_('Credit number format'),
default='A{regie_id:02d}-{yy}-{mm}-{number:07d}',
max_length=100,
)
refund_number_format = models.CharField(
_('Refund number format'),
default='V{regie_id:02d}-{yy}-{mm}-{number:07d}',
max_length=100,
)
invoice_model = models.CharField(
_('Invoice model'),
max_length=10,
choices=INVOICE_MODELS,
default='middle',
)
invoice_custom_text = RichTextField(
_('Custom text in invoice'), blank=True, null=True, help_text=_('Displayed in footer.')
)
invoice_main_colour = models.CharField(_('Main colour in invoice'), max_length=7, default='#DF5A13')
cashier_name = models.CharField(_('Cashier name'), max_length=256, blank=True)
city_name = models.CharField(_('City name'), max_length=256, blank=True)
application_component_type = 'regies'
application_label_singular = _('Regie')
application_label_plural = _('Regies')
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
def get_dependencies(self):
yield self.cashier_role
yield self.payer
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'description': self.description,
'permissions': {
'cashier': self.cashier_role.name if self.cashier_role else None,
},
'payer': self.payer.slug if self.payer else None,
'payment_types': [p.export_json() for p in self.paymenttype_set.all()],
}
@classmethod
def import_json(cls, data):
data = copy.deepcopy(data)
payment_types = data.pop('payment_types', [])
permissions = data.pop('permissions') or {}
data = clean_import_data(cls, data)
role_name = permissions.get('cashier')
if role_name:
try:
data['cashier_role'] = Group.objects.get(name=role_name)
except Group.DoesNotExists:
raise RegieImportError('Missing role: %s' % role_name)
except Group.MultipleObjectsReturned:
raise RegieImportError('Multiple role exist with the name: %s' % role_name)
if data['payer']:
try:
data['payer'] = Payer.objects.get(slug=data['payer'])
except Payer.DoesNotExist:
raise LingoImportError(_('Missing "%s" payer') % data['payer'])
regie, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
for payment_type in payment_types:
payment_type['regie'] = regie
PaymentType.import_json(payment_type)
return created, regie
def get_counter_name(self, invoice_date):
return self.counter_name.format(
yyyy=invoice_date.strftime('%Y'),
yy=invoice_date.strftime('%y'),
mm=invoice_date.strftime('%m'),
)
def format_number(self, invoice_date, invoice_number, kind):
number_format = getattr(self, '%s_number_format' % kind)
return number_format.format(
yyyy=invoice_date.strftime('%Y'),
yy=invoice_date.strftime('%y'),
mm=invoice_date.strftime('%m'),
number=invoice_number,
regie_id=self.pk,
)
def get_payer_external_id(self, request, user_external_id):
if not self.payer:
raise PayerError(details={'reason': 'missing-payer'})
context = {'user_external_id': user_external_id}
if ':' in user_external_id:
context['user_external_raw_id'] = user_external_id.split(':')[1]
return self.payer.get_payer_external_id(request, context)
def get_payer_external_id_from_nameid(self, request, nameid):
if not self.payer:
raise PayerError(details={'reason': 'missing-payer'})
context = {'nameid': nameid}
return self.payer.get_payer_external_id_from_nameid(request, context)
def get_payer_data(self, request, payer_external_id):
if not self.payer:
raise PayerError(details={'reason': 'missing-payer'})
return self.payer.get_payer_data(request, payer_external_id)
class Campaign(models.Model):
label = models.CharField(_('Label'), max_length=150)
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
date_start = models.DateField(_('Start date'))
date_end = models.DateField(_('End date'))
date_publication = models.DateField(_('Publication date'))
date_payment_deadline = models.DateField(_('Payment deadline'))
date_due = models.DateField(_('Due date'))
date_debit = models.DateField(_('Debit date'))
injected_lines = models.CharField(
_('Integrate injected lines'),
choices=[
('no', _('No')),
('period', _('Yes, only for the period')),
('all', _('Yes, all injected lines before the end of the period')),
],
default='no',
max_length=10,
)
agendas = models.ManyToManyField(Agenda, related_name='campaigns')
invalid = models.BooleanField(default=False)
finalized = models.BooleanField(default=False)
invoice_model = models.CharField(
_('Invoice model'),
max_length=10,
choices=INVOICE_MODELS,
default='middle',
)
invoice_custom_text = RichTextField(
_('Custom text in invoice'),
blank=True,
null=True,
help_text=_('Displayed under the address and additional information blocks.'),
)
def __str__(self):
return _('%(label)s (%(start)s - %(end)s)') % {
'label': self.label,
'start': date_format(self.date_start, 'd/m/Y'),
'end': date_format(self.date_end, 'd/m/Y'),
}
def mark_as_valid(self):
self.invalid = False
self.save()
def mark_as_invalid(self, commit=True):
self.invalid = True
if commit:
self.save()
def mark_as_finalized(self):
self.finalized = True
self.save()
def generate(self, spool=True):
pool = self.pool_set.create(draft=True)
try:
pool.init()
except Exception:
return
if spool and 'uwsgi' in sys.modules:
from lingo.invoicing.spooler import generate_invoices
tenant = getattr(connection, 'tenant', None)
transaction.on_commit(
lambda: generate_invoices.spool(
campaign_id=str(self.pk), pool_id=str(pool.pk), domain=getattr(tenant, 'domain_url', None)
)
)
return
pool.generate_invoices()
class Pool(models.Model):
campaign = models.ForeignKey(Campaign, on_delete=models.PROTECT)
draft = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True)
status = models.CharField(
choices=[
('registered', _('Registered')),
('running', _('Running')),
('failed', _('Failed')),
('completed', _('Completed')),
],
default='registered',
max_length=100,
)
exception = models.TextField()
@property
def is_last(self):
return not self.campaign.pool_set.filter(created_at__gt=self.created_at).exists()
def init(self):
from lingo.invoicing import utils
try:
agendas = utils.get_agendas(pool=self)
if agendas:
lock_events_check(
agenda_slugs=[a.slug for a in agendas],
date_start=self.campaign.date_start,
date_end=self.campaign.date_end,
)
except ChronoError as e:
self.status = 'failed'
self.exception = e.msg
self.completed_at = now()
self.save()
raise
except Exception:
self.status = 'failed'
self.exception = traceback.format_exc()
self.completed_at = now()
self.save()
raise
def generate_invoices(self):
from lingo.invoicing import utils
self.status = 'running'
self.save()
try:
# get agendas with pricing corresponding to the period
agendas = utils.get_agendas(pool=self)
# get subscribed users for each agenda, for the period
users = utils.get_users_from_subscriptions(agendas=agendas, pool=self)
# get invoice lines for all subscribed users, for each agenda in the corresponding period
lines = utils.get_all_invoice_lines(agendas=agendas, users=users, pool=self)
# and generate invoices
utils.generate_invoices_from_lines(all_lines=lines, pool=self)
except ChronoError as e:
self.status = 'failed'
self.exception = e.msg
except Exception:
self.status = 'failed'
self.exception = traceback.format_exc()
raise
finally:
if self.status == 'running':
self.status = 'completed'
self.completed_at = now()
self.save()
def promote(self):
if not self.is_last:
# not the last
raise PoolPromotionError('Pool too old')
if not self.draft:
# not a draft
raise PoolPromotionError('Pool is final')
if self.status != 'completed':
# not completed
raise PoolPromotionError('Pool is not completed')
final_pool = copy.deepcopy(self)
final_pool.pk = None
final_pool.draft = False
final_pool.status = 'registered'
final_pool.completed_at = None
final_pool.save()
if 'uwsgi' in sys.modules:
from lingo.invoicing.spooler import populate_from_draft
tenant = getattr(connection, 'tenant', None)
transaction.on_commit(
lambda: populate_from_draft.spool(
draft_pool_id=str(self.pk),
final_pool_id=str(final_pool.pk),
domain=getattr(tenant, 'domain_url', None),
)
)
return
final_pool.populate_from_draft(self)
def populate_from_draft(self, draft_pool):
try:
self.status = 'running'
self.save()
for invoice in draft_pool.draftinvoice_set.all().order_by('pk'):
invoice.promote(pool=self)
for line in draft_pool.draftjournalline_set.filter(invoice_line__isnull=True).order_by('pk'):
line.promote(pool=self)
except Exception:
self.status = 'failed'
self.exception = traceback.format_exc()
raise
finally:
if self.status == 'running':
self.status = 'completed'
self.completed_at = now()
self.save()
class InjectedLine(models.Model):
event_date = models.DateField()
slug = models.SlugField(max_length=250)
label = models.CharField(max_length=260)
amount = models.DecimalField(max_digits=9, decimal_places=2)
user_external_id = models.CharField(max_length=250)
payer_external_id = models.CharField(max_length=250)
payer_first_name = models.CharField(max_length=250)
payer_last_name = models.CharField(max_length=250)
payer_address = models.TextField()
payer_demat = models.BooleanField(default=False)
payer_direct_debit = models.BooleanField(default=False)
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
class AbstractJournalLine(models.Model):
event_date = models.DateField()
slug = models.SlugField(max_length=250)
label = models.CharField(max_length=260)
description = models.CharField(max_length=500)
amount = models.DecimalField(max_digits=9, decimal_places=2)
quantity = models.IntegerField(default=1)
quantity_type = models.CharField(
max_length=10,
choices=[
('units', _('Units')),
('minutes', _('Minutes')),
],
default='units',
)
user_external_id = models.CharField(max_length=250)
user_first_name = models.CharField(max_length=250)
user_last_name = models.CharField(max_length=250)
payer_external_id = models.CharField(max_length=250)
payer_first_name = models.CharField(max_length=250)
payer_last_name = models.CharField(max_length=250)
payer_address = models.TextField()
payer_demat = models.BooleanField(default=False)
payer_direct_debit = models.BooleanField(default=False)
event = models.JSONField(default=dict)
booking = models.JSONField(default=dict)
pricing_data = models.JSONField(default=dict, encoder=DjangoJSONEncoder)
status = models.CharField(
max_length=10,
choices=[
('success', _('Success')),
('warning', _('Warning')),
('error', _('Error')),
],
)
error_status = models.CharField(
max_length=10,
choices=[
('ignored', _('Ignored')),
('fixed', _('Fixed')),
],
blank=True,
)
pool = models.ForeignKey(Pool, on_delete=models.PROTECT, null=True)
from_injected_line = models.ForeignKey(InjectedLine, on_delete=models.PROTECT, null=True)
class Meta:
abstract = True
@property
def user_name(self):
user_name = '%s %s' % (self.user_first_name, self.user_last_name)
return user_name.strip()
@property
def payer_name(self):
payer_name = '%s %s' % (self.payer_first_name, self.payer_last_name)
return payer_name.strip()
def get_error_display(self):
if self.status == 'success':
return
error_class = str(self.pricing_data.get('error'))
error_details = self.pricing_data.get('error_details', {})
error_messages = {
'PricingNotFound': _('Agenda pricing not found'),
'CriteriaConditionNotFound': _('No matching criteria for category: %(category)s'),
'MultipleDefaultCriteriaCondition': _(
'Multiple default criteria found for category: %(category)s'
),
'PricingDataError': _('Impossible to determine a pricing for criterias: %(criterias)s'),
'PricingDataFormatError': _('Pricing is not a %(wanted)s: %(pricing)s'),
'PricingReductionRateError': _('Impossible to determine a reduction rate'),
'PricingReductionRateFormatError': _('Reduction rate is not a %(wanted)s: %(reduction_rate)s'),
'PricingReductionRateValueError': _('Reduction rate bad value: %(reduction_rate)s'),
'PricingEffortRateTargetError': _('Impossible to determine an effort rate target'),
'PricingEffortRateTargetFormatError': _(
'Effort rate target is not a %(wanted)s: %(effort_rate_target)s'
),
'PricingEffortRateTargetValueError': _('Effort rate target bad value: %(effort_rate_target)s'),
'PricingUnknownCheckStatusError': _('Unknown check status: %(status)s'),
'PricingEventNotCheckedError': _('Event is not checked'),
'PricingBookingNotCheckedError': _('Booking is not checked'),
'PricingMultipleBookingError': _('Multiple booking found'),
'PricingBookingCheckTypeError': _('Check type error: %(reason)s'),
'PayerError': _('Impossible to determine payer: %(reason)s'),
'PayerDataError': _('Impossible to get payer %(key)s: %(reason)s'),
}
formats = {
'decimal': _('decimal'),
}
reason = None
if error_details.get('reason'):
reasons = {
'not-found': _('not found'),
'wrong-kind': _('wrong kind (group: %(check_type_group)s, check type: %(check_type)s)'),
'not-configured': _(
'pricing not configured (group: %(check_type_group)s, check type: %(check_type)s)'
),
'empty-template': _('template is empty'),
'empty-result': _('result is empty'),
'syntax-error': _('syntax error'),
'variable-error': _('variable error'),
'missing-card-model': _('card model is not configured'),
'missing-payer': _('payer is not configured on regie'),
'not-a-boolean': _('result is not a boolean'),
'not-defined': _('mapping not defined'),
}
reason = reasons.get(error_details['reason']) % {
'check_type': error_details.get('check_type'),
'check_type_group': error_details.get('check_type_group'),
}
return (
error_messages.get(error_class, '')
% {
'category': error_details.get('category'),
'criterias': ', '.join(
'%s (%s)' % (v, _('category: %s') % k)
for k, v in error_details.get('criterias', {}).items()
),
'pricing': error_details.get('pricing'),
'wanted': formats.get(error_details.get('wanted')),
'status': error_details.get('status'),
'reason': reason,
'key': error_details.get('key'),
'reduction_rate': error_details.get('reduction_rate'),
'effort_rate_target': error_details.get('effort_rate_target'),
}
or error_class
)
def get_chrono_event_url(self):
if not settings.KNOWN_SERVICES.get('chrono'):
return
chrono = list(settings.KNOWN_SERVICES['chrono'].values())[0]
chrono_url = chrono.get('url')
if not chrono_url:
return
if not self.event.get('agenda') or not self.event.get('slug'):
return
return '%smanage/agendas/%s/events/%s/' % (chrono_url, self.event['agenda'], self.event['slug'])
class DraftJournalLine(AbstractJournalLine):
invoice_line = models.ForeignKey(
'invoicing.DraftInvoiceLine', on_delete=models.PROTECT, null=True, related_name='journal_lines'
)
def promote(self, pool=None, invoice_line=None):
final_line = copy.deepcopy(self)
final_line.__class__ = JournalLine
final_line.pk = None
final_line.pool = pool
final_line.invoice_line = invoice_line
final_line.error_status = ''
final_line.save()
class JournalLine(AbstractJournalLine):
invoice_line = models.ForeignKey(
'invoicing.InvoiceLine', on_delete=models.PROTECT, null=True, related_name='journal_lines'
)
class Counter(models.Model):
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
name = models.CharField(max_length=128)
value = models.PositiveIntegerField(default=0)
kind = models.CharField(
max_length=10,
choices=[
('invoice', _('Invoice')),
('payment', _('Payment')),
('credit', _('Credit')),
('refund', _('Refund')),
],
)
class Meta:
unique_together = (('regie', 'name', 'kind'),)
@classmethod
def get_count(cls, regie, name, kind):
queryset = cls.objects.select_for_update()
with transaction.atomic():
counter, dummy = queryset.get_or_create(regie=regie, name=name, kind=kind)
counter.value += 1
counter.save()
return counter.value
class AbstractInvoiceObject(models.Model):
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
label = models.CharField(_('Label'), max_length=300)
total_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
payer_external_id = models.CharField(max_length=250)
payer_first_name = models.CharField(max_length=250)
payer_last_name = models.CharField(max_length=250)
payer_address = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
abstract = True
@property
def payer_name(self):
payer_name = '%s %s' % (self.payer_first_name, self.payer_last_name)
return payer_name.strip()
@property
def payer_external_raw_id(self):
if ':' in self.payer_external_id:
return self.payer_external_id.split(':')[1]
return self.payer_external_id
class AbstractInvoice(AbstractInvoiceObject):
date_publication = models.DateField(_('Publication date'))
date_payment_deadline = models.DateField(_('Payment deadline'))
date_due = models.DateField(_('Due date'))
date_debit = models.DateField(_('Debit date'), null=True)
payer_demat = models.BooleanField(default=False)
payer_direct_debit = models.BooleanField(default=False)
pool = models.ForeignKey(Pool, on_delete=models.PROTECT, null=True)
payment_callback_url = models.URLField(blank=True)
class Meta:
abstract = True
def get_grouped_and_ordered_lines(self):
lines = []
possible_status = ['presence', 'absence', 'cancelled', '']
for line in self.lines.all():
# build event date/datetime, and check status
event_date = line.event_date.strftime('Y-m-d')
event_status = 'z' * 5000
if line.details:
event_date = '%s:%s' % (line.event_date, line.details.get('event_time') or '')
status = line.details.get('status') or ''
event_status = '%s:%s' % (possible_status.index(status), status)
if status in ['presence', 'absence'] and line.details.get('check_type_label'):
# note: presence without reason will be sorted first
event_status += ':%s' % line.details['check_type_label']
if status == 'absence' and not line.details.get('check_type_label'):
# so absence without reason will be sorted last
event_status += ':%s' % ('z' * 5000)
lines.append(
(
line,
# sort by user
line.user_external_id,
# by activity
line.activity_label or 'z' * 5000,
# by date/datetime
event_date,
# by slug
line.event_slug,
# by check status
event_status,
# and pk
line.pk,
)
)
lines = sorted(
lines,
key=lambda li: li[1:],
)
lines = [li[0] for li in lines]
return lines
def html(self):
template = get_template('lingo/invoicing/invoice.html')
lines = self.get_grouped_and_ordered_lines()
context = {
'regie': self.regie,
'object': self,
'invoice': self,
'lines': lines,
'appearance_settings': AppearanceSettings.singleton(),
}
if self.pool and self.pool.campaign.invoice_model == 'full':
context['lines_for_details'] = [li for li in lines if li.description and li.display_description()]
return template.render(context)
def payments_html(self):
template = get_template('lingo/invoicing/payments_certificate.html')
context = {
'regie': self.regie,
'invoice': self,
'object': self,
'payments': self.get_invoice_payments(),
'appearance_settings': AppearanceSettings.singleton(),
}
return template.render(context)
class DraftInvoice(AbstractInvoice):
@property
def formatted_number(self):
return '%s-%s' % (_('TEMPORARY'), self.pk)
def promote(self, pool=None):
final_invoice = copy.deepcopy(self)
final_invoice.__class__ = Invoice
final_invoice.pk = None
final_invoice.uuid = uuid.uuid4()
final_invoice.pool = pool
final_invoice.set_number()
final_invoice.paid_amount = 0
final_invoice.remaining_amount = 0
final_invoice.cancelled_at = None
final_invoice.save()
for line in self.lines.all().order_by('pk'):
line.promote(pool=pool, invoice=final_invoice)
return final_invoice
def promote_into_credit(self, label):
credit = copy.deepcopy(self)
credit.__class__ = Credit
credit.pk = None
credit.uuid = uuid.uuid4()
credit.set_number()
credit.assigned_amount = 0
credit.remaining_amount = 0
credit.label = label
credit.save()
for line in self.lines.all().order_by('pk'):
line.promote_into_credit(credit=credit)
return credit
class Invoice(AbstractInvoice):
number = models.PositiveIntegerField(default=0)
formatted_number = models.CharField(max_length=200)
paid_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
remaining_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
cancelled_at = models.DateTimeField(null=True)
def set_number(self):
self.number = Counter.get_count(
regie=self.regie,
name=self.regie.get_counter_name(self.created_at),
kind='invoice',
)
self.formatted_number = self.regie.format_number(self.created_at, self.number, 'invoice')
def normalize(self, for_backoffice=False, label_plus=False):
paid = bool(self.remaining_amount == 0)
payable = True
date_limit = self.date_due if for_backoffice else self.date_payment_deadline
if paid:
payable = False
elif date_limit < datetime.date.today():
payable = False
label = self.label
if label_plus:
label = '%s - %s' % (self.formatted_number, label)
amount = _('%(amount)s') % {'amount': floatformat(self.remaining_amount, 2)}
label += ' ' + _('(amount to pay: %s)') % amount
return {
'id': str(self.uuid),
'display_id': self.formatted_number,
'label': label,
'paid': paid,
'amount': self.remaining_amount,
'remaining_amount': self.remaining_amount,
'total_amount': self.total_amount,
'created': self.created_at.date(),
'pay_limit_date': date_limit if not paid else '',
'online_payment': not self.payer_direct_debit if payable and not for_backoffice else False,
'has_pdf': True,
'has_payments_pdf': bool(paid),
'due_date': self.date_due,
'payment_deadline_date': self.date_payment_deadline,
'disabled': self.payer_direct_debit or not payable,
'is_line': False,
'invoice_label': self.label,
}
def get_invoice_payments(self):
invoice_line_payments = (
InvoiceLinePayment.objects.filter(line__invoice=self)
.select_related('payment', 'payment__payment_type')
.order_by('created_at')
)
invoice_payments = collections.defaultdict(InvoicePayment)
for invoice_line_payment in invoice_line_payments:
payment = invoice_line_payment.payment
invoice_payments[payment].invoice = self
invoice_payments[payment].payment = payment
invoice_payments[payment].amount += invoice_line_payment.amount
return sorted(invoice_payments.values(), key=lambda a: a.payment.created_at)
class Credit(AbstractInvoiceObject):
number = models.PositiveIntegerField(default=0)
formatted_number = models.CharField(max_length=200)
assigned_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
remaining_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
class Meta:
constraints = [
models.CheckConstraint(
check=models.Q(
models.Q(
('assigned_amount__lte', models.F('total_amount')),
('total_amount__gt', 0),
),
models.Q(
('assigned_amount__gte', models.F('total_amount')),
('total_amount__lt', 0),
),
models.Q(('assigned_amount', 0), ('total_amount', 0)),
_connector='OR',
),
name='assigned_amount_check',
)
]
def set_number(self):
self.number = Counter.get_count(
regie=self.regie,
name=self.regie.get_counter_name(self.created_at),
kind='credit',
)
self.formatted_number = self.regie.format_number(self.created_at, self.number, 'credit')
def get_grouped_and_ordered_lines(self):
lines = [
(
line,
# sort by user
line.user_external_id,
# by slug
line.slug,
# and pk
line.pk,
)
for line in self.lines.all()
]
lines = sorted(
lines,
key=lambda li: li[1:],
)
lines = [li[0] for li in lines]
return lines
def html(self):
template = get_template('lingo/invoicing/credit.html')
lines = self.get_grouped_and_ordered_lines()
context = {
'regie': self.regie,
'object': self,
'credit': self,
'lines': lines,
'appearance_settings': AppearanceSettings.singleton(),
}
return template.render(context)
class AbstractInvoiceLineObject(models.Model):
uuid = models.UUIDField(default=uuid.uuid4, editable=False, null=True)
event_date = models.DateField()
slug = models.CharField(max_length=250)
label = models.CharField(max_length=260)
quantity = models.DecimalField(max_digits=9, decimal_places=2)
unit_amount = models.DecimalField(max_digits=9, decimal_places=2)
total_amount = models.DecimalField(max_digits=9, decimal_places=2)
description = models.CharField(max_length=500)
user_external_id = models.CharField(max_length=250)
user_first_name = models.CharField(max_length=250)
user_last_name = models.CharField(max_length=250)
class Meta:
abstract = True
@property
def user_name(self):
user_name = '%s %s' % (self.user_first_name, self.user_last_name)
return user_name.strip()
class AbstractInvoiceLine(AbstractInvoiceLineObject):
payer_external_id = models.CharField(max_length=250)
payer_first_name = models.CharField(max_length=250)
payer_last_name = models.CharField(max_length=250)
payer_address = models.TextField()
payer_demat = models.BooleanField(default=False)
payer_direct_debit = models.BooleanField(default=False)
details = models.JSONField(default=dict, encoder=DjangoJSONEncoder)
event_slug = models.CharField(max_length=250)
event_label = models.CharField(max_length=260)
agenda_slug = models.CharField(max_length=250)
activity_label = models.CharField(max_length=250)
pool = models.ForeignKey(Pool, on_delete=models.PROTECT, null=True)
class Meta:
abstract = True
@property
def payer_name(self):
payer_name = '%s %s' % (self.payer_first_name, self.payer_last_name)
return payer_name.strip()
def display_description(self):
if self.description == '@overtaking@':
return False
return True
class DraftInvoiceLine(AbstractInvoiceLine):
invoice = models.ForeignKey(DraftInvoice, on_delete=models.PROTECT, null=True, related_name='lines')
def promote(self, pool=None, invoice=None):
final_line = copy.deepcopy(self)
final_line.__class__ = InvoiceLine
final_line.pk = None
final_line.pool = pool
final_line.invoice = invoice
final_line.error_status = ''
final_line.paid_amount = 0
final_line.remaining_amount = 0
final_line.save()
for line in self.journal_lines.all().order_by('pk'):
line.promote(pool=pool, invoice_line=final_line)
def promote_into_credit(self, credit=None):
credit_line = copy.deepcopy(self)
credit_line.__class__ = CreditLine
credit_line.pk = None
credit_line.credit = credit
credit_line.quantity = -self.quantity # inverse quantities, so credit total_amout is positive
credit_line.assigned_amount = 0
credit_line.remaining_amount = 0
credit_line.save()
class InvoiceLine(AbstractInvoiceLine):
invoice = models.ForeignKey(Invoice, on_delete=models.PROTECT, null=True, related_name='lines')
paid_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
remaining_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
class Meta:
constraints = [
models.CheckConstraint(
check=models.Q(
models.Q(
('paid_amount__lte', models.F('total_amount')),
('total_amount__gt', 0),
),
models.Q(
('paid_amount__gte', models.F('total_amount')),
('total_amount__lt', 0),
),
models.Q(('paid_amount', 0), ('total_amount', 0)),
_connector='OR',
),
name='paid_amount_check',
)
]
class CreditLine(AbstractInvoiceLineObject):
credit = models.ForeignKey(Credit, on_delete=models.PROTECT, related_name='lines')
DEFAULT_PAYMENT_TYPES = [
('credit', _('Credit')),
('creditcard', _('Credit card')),
('cash', _('Cash')),
('check', _('Check')),
('directdebit', _('Direct debit')),
('online', _('Online')),
('cesu', _('CESU')),
('holidaycheck', _('Holiday check')),
]
class PaymentType(models.Model):
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
disabled = models.BooleanField(_('Disabled'), default=False)
class Meta:
ordering = ['label']
unique_together = ['regie', 'slug']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
@classmethod
def create_defaults(cls, regie):
for slug, label in DEFAULT_PAYMENT_TYPES:
cls.objects.get_or_create(regie=regie, slug=slug, defaults={'label': label})
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'disabled': self.disabled,
}
@classmethod
def import_json(cls, data):
data = copy.deepcopy(data)
data = clean_import_data(cls, data)
payment_type, created = cls.objects.update_or_create(
slug=data['slug'], regie=data['regie'], defaults=data
)
return created, payment_type
PAYMENT_INFO = [
('check_issuer', _('Issuer')),
('check_bank', _('Bank/Organism')),
('check_number', _('Number')),
('check_reference', _('Reference')),
]
class Payment(models.Model):
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
number = models.PositiveIntegerField(default=0)
formatted_number = models.CharField(max_length=200)
amount = models.DecimalField(
max_digits=9, decimal_places=2, validators=[validators.MinValueValidator(decimal.Decimal('0.01'))]
)
payment_type = models.ForeignKey(PaymentType, on_delete=models.PROTECT)
payment_info = models.JSONField(blank=True, default=dict)
order_id = models.CharField(max_length=200, null=True)
payer_external_id = models.CharField(max_length=250)
payer_first_name = models.CharField(max_length=250)
payer_last_name = models.CharField(max_length=250)
payer_address = models.TextField()
order_date = models.DateTimeField(null=True)
created_at = models.DateTimeField(auto_now_add=True)
@property
def payer_name(self):
payer_name = '%s %s' % (self.payer_first_name, self.payer_last_name)
return payer_name.strip()
@property
def payer_external_raw_id(self):
if ':' in self.payer_external_id:
return self.payer_external_id.split(':')[1]
return self.payer_external_id
@classmethod
def make_payment(
cls,
regie,
amount,
payment_type,
invoices=None,
lines=None,
order_id=None,
order_date=None,
payment_info=None,
):
invoices = invoices or []
lines = lines or []
line_ids = None
if lines:
line_ids = [li.pk for li in lines]
invoices = Invoice.objects.select_for_update().filter(pk__in=[li.invoice_id for li in lines])
else:
invoices = Invoice.objects.select_for_update().filter(pk__in=[i.pk for i in invoices])
with transaction.atomic():
payment = cls.objects.create(
regie=regie,
amount=amount,
payment_type=payment_type,
order_id=order_id,
order_date=order_date,
payer_external_id=invoices[0].payer_external_id,
payer_first_name=invoices[0].payer_first_name,
payer_last_name=invoices[0].payer_last_name,
payer_address=invoices[0].payer_address,
payment_info=payment_info or {},
)
payment.set_number()
payment.save()
amount_to_assign = amount
for invoice in invoices.order_by('date_publication', 'created_at'):
if not invoice.remaining_amount:
# nothing to pay for this invoice
continue
for line in invoice.lines.order_by('pk'):
if lines and line.pk not in line_ids:
# if lines specified, ignore other lines
continue
if not line.remaining_amount:
# nothing to pay for this line
continue
# paid_amount for this line: it can not be greater than line remaining_amount
paid_amount = decimal.Decimal(min(line.remaining_amount, amount_to_assign))
InvoiceLinePayment.objects.create(
payment=payment,
line=line,
amount=paid_amount,
)
# new amount to assign
amount_to_assign -= paid_amount
if amount_to_assign <= 0:
break
if amount_to_assign <= 0:
break
return payment
def set_number(self):
self.number = Counter.get_count(
regie=self.regie,
name=self.regie.get_counter_name(self.created_at),
kind='payment',
)
self.formatted_number = self.regie.format_number(self.created_at, self.number, 'payment')
def get_payment_info(self):
result = []
for key, label in PAYMENT_INFO:
if self.payment_info.get(key):
result.append((label, self.payment_info[key]))
return result
def get_invoice_payments(self):
if hasattr(self, 'prefetched_invoicelinepayments'):
invoice_line_payments = self.prefetched_invoicelinepayments
else:
invoice_line_payments = self.invoicelinepayment_set.select_related('line__invoice').order_by(
'created_at'
)
invoice_payments = collections.defaultdict(InvoicePayment)
for invoice_line_payment in invoice_line_payments:
invoice = invoice_line_payment.line.invoice
invoice_payments[invoice].invoice = invoice
invoice_payments[invoice].payment = self
invoice_payments[invoice].amount += invoice_line_payment.amount
return sorted(invoice_payments.values(), key=lambda a: a.invoice.created_at)
def html(self):
template = get_template('lingo/invoicing/payment.html')
context = {
'regie': self.regie,
'object': self,
'payment': self,
'invoice_payments': self.get_invoice_payments(),
'appearance_settings': AppearanceSettings.singleton(),
}
return template.render(context)
class InvoiceLinePayment(models.Model):
payment = models.ForeignKey(Payment, on_delete=models.PROTECT)
line = models.ForeignKey(InvoiceLine, on_delete=models.PROTECT)
amount = models.DecimalField(
max_digits=9, decimal_places=2, validators=[validators.MinValueValidator(decimal.Decimal('0.01'))]
)
created_at = models.DateTimeField(auto_now_add=True)
@dataclasses.dataclass
class InvoicePayment:
payment: Payment = None
invoice: Invoice = None
amount: decimal.Decimal = 0
class Refund(models.Model):
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
number = models.PositiveIntegerField(default=0)
formatted_number = models.CharField(max_length=200)
amount = models.DecimalField(
max_digits=9, decimal_places=2, validators=[validators.MinValueValidator(decimal.Decimal('0.01'))]
)
payer_external_id = models.CharField(max_length=250)
payer_first_name = models.CharField(max_length=250)
payer_last_name = models.CharField(max_length=250)
payer_address = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
@property
def payer_name(self):
payer_name = '%s %s' % (self.payer_first_name, self.payer_last_name)
return payer_name.strip()
@property
def payer_external_raw_id(self):
if ':' in self.payer_external_id:
return self.payer_external_id.split(':')[1]
return self.payer_external_id
def set_number(self):
self.number = Counter.get_count(
regie=self.regie,
name=self.regie.get_counter_name(self.created_at),
kind='refund',
)
self.formatted_number = self.regie.format_number(self.created_at, self.number, 'refund')
class CreditAssignment(models.Model):
invoice = models.ForeignKey(Invoice, on_delete=models.PROTECT, null=True)
payment = models.ForeignKey(Payment, on_delete=models.PROTECT, null=True)
refund = models.ForeignKey(Refund, on_delete=models.PROTECT, null=True)
credit = models.ForeignKey(Credit, on_delete=models.PROTECT)
amount = models.DecimalField(max_digits=9, decimal_places=2)
created_at = models.DateTimeField(auto_now_add=True)
@classmethod
def make_assignments(cls, regie, basket):
credit_qs = (
Credit.objects.select_for_update()
.filter(remaining_amount__gt=0, regie=regie, payer_external_id=basket.payer_external_id)
.order_by('pk')
)
with transaction.atomic():
available_credit = sum(c.remaining_amount for c in credit_qs)
amount_to_assign = min(available_credit, basket.invoice.remaining_amount)
if amount_to_assign == 0:
return
# assign credits
for credit in credit_qs:
paid_amount = decimal.Decimal(min(credit.remaining_amount, amount_to_assign))
CreditAssignment.objects.create(
invoice=basket.invoice,
credit=credit,
amount=paid_amount,
)
# new amount to assign
amount_to_assign -= paid_amount
if amount_to_assign <= 0:
break
class AppearanceSettings(models.Model):
logo = models.ImageField(
verbose_name=_('Logo'),
upload_to='logo',
blank=True,
null=True,
)
address = RichTextField(
verbose_name=_('Address'),
blank=True,
null=True,
)
extra_info = RichTextField(
verbose_name=_('Additional information'),
blank=True,
null=True,
help_text=_('Displayed below the address block.'),
)
@classmethod
def singleton(cls):
return cls.objects.first() or cls()
def logo_base64_encoded(self):
return force_str(base64.encodebytes(self.logo.read()))