1381 lines
52 KiB
Python
1381 lines
52 KiB
Python
#
|
|
# lingo - basket and payment system
|
|
# Copyright (C) 2015 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import re
|
|
import urllib.parse
|
|
from decimal import Decimal
|
|
from functools import reduce
|
|
|
|
import eopayment
|
|
from dateutil import parser
|
|
from django import template
|
|
from django.conf import settings
|
|
from django.contrib.postgres.fields import JSONField
|
|
from django.core import serializers
|
|
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
|
|
from django.core.mail import EmailMultiAlternatives
|
|
from django.db import models
|
|
from django.db.transaction import atomic
|
|
from django.forms import Select
|
|
from django.forms import models as model_forms
|
|
from django.template.loader import render_to_string
|
|
from django.urls import reverse
|
|
from django.utils import dateparse, timezone
|
|
from django.utils.encoding import force_bytes
|
|
from django.utils.formats import localize
|
|
from django.utils.timezone import make_aware, now, utc
|
|
from django.utils.translation import gettext_lazy as _
|
|
from requests import RequestException
|
|
|
|
from combo.apps.notifications.models import Notification
|
|
from combo.data.fields import RichTextField
|
|
from combo.data.library import register_cell_class
|
|
from combo.data.models import CellBase
|
|
from combo.utils import NothingInCacheException, aes_hex_encrypt, requests
|
|
|
|
from .utils import signing_dumps
|
|
|
|
try:
|
|
from mellon.models import UserSAMLIdentifier
|
|
except ImportError:
|
|
UserSAMLIdentifier = None
|
|
|
|
|
|
logger = logging.getLogger('combo.apps.lingo')
|
|
|
|
|
|
class LingoException(Exception):
|
|
def __init__(self, msg=None, *, transaction=None):
|
|
self.transaction = transaction
|
|
super().__init__(msg)
|
|
|
|
|
|
class UnsignedPaymentException(LingoException):
|
|
pass
|
|
|
|
|
|
class UnknownPaymentException(LingoException):
|
|
pass
|
|
|
|
|
|
EXPIRED = 9999
|
|
|
|
|
|
SERVICES = [
|
|
(eopayment.DUMMY, _('Dummy (for tests)')),
|
|
(eopayment.SYSTEMPAY, 'systempay (Banque Populaire)'),
|
|
(eopayment.SIPS2, _('SIPS (Atos, other countries)')),
|
|
(eopayment.OGONE, _('Ingenico (formerly Ogone)')),
|
|
(eopayment.PAYBOX, 'Paybox'),
|
|
(eopayment.PAYZEN, 'PayZen'),
|
|
(eopayment.TIPI, 'PayFiP/TIPI Régie'),
|
|
(eopayment.PAYFIP_WS, 'PayFiP Régie Web-Service'),
|
|
(eopayment.KEYWARE, 'Keyware'),
|
|
(eopayment.MOLLIE, 'Mollie'),
|
|
(eopayment.SAGA, 'Saga/PayFiP Régie (Futur System)'),
|
|
]
|
|
|
|
|
|
def eopayment_response_to_extra_info(response, **kwargs):
|
|
extra_info = dict(kwargs)
|
|
extra_info.update(
|
|
{
|
|
'eopayment_order_id': response.order_id,
|
|
'eopayment_response': repr(response),
|
|
}
|
|
)
|
|
for k, v in response.bank_data.items():
|
|
extra_info['eopayment_bank_data_' + k] = v
|
|
return extra_info
|
|
|
|
|
|
class RegieException(Exception):
|
|
pass
|
|
|
|
|
|
class RemoteInvoiceException(Exception):
|
|
pass
|
|
|
|
|
|
def build_remote_item(data, regie):
|
|
return RemoteItem(
|
|
id=data.get('id'),
|
|
regie=regie,
|
|
creation_date=data['created'],
|
|
payment_limit_date=data['pay_limit_date'],
|
|
display_id=data.get('display_id'),
|
|
total_amount=data.get('total_amount'),
|
|
amount=data.get('amount'),
|
|
amount_paid=data.get('amount_paid'),
|
|
subject=data.get('label'),
|
|
has_pdf=data.get('has_pdf'),
|
|
online_payment=data.get('online_payment'),
|
|
paid=data.get('paid'),
|
|
payment_date=data.get('payment_date'),
|
|
no_online_payment_reason=data.get('no_online_payment_reason'),
|
|
reference_id=data.get('reference_id'),
|
|
)
|
|
|
|
|
|
class PaymentBackendManager(models.Manager):
|
|
def get_by_natural_key(self, slug):
|
|
return self.get(slug=slug)
|
|
|
|
|
|
class PaymentBackend(models.Model):
|
|
label = models.CharField(verbose_name=_('Label'), max_length=64)
|
|
slug = models.SlugField(
|
|
unique=True,
|
|
verbose_name=_('Identifier'),
|
|
help_text=_('The identifier is used in webservice calls and callback URLs for the payment backend.'),
|
|
)
|
|
service = models.CharField(verbose_name=_('Payment Service'), max_length=64, choices=SERVICES)
|
|
service_options = JSONField(blank=True, default=dict, verbose_name=_('Payment Service Options'))
|
|
|
|
objects = PaymentBackendManager()
|
|
|
|
def __str__(self):
|
|
return str(self.label)
|
|
|
|
@property
|
|
def eopayment(self):
|
|
return self.make_eopayment()
|
|
|
|
def make_eopayment(self, *, request=None, automatic_return_url=None, normal_return_url=None, **kwargs):
|
|
options = self.service_options or {}
|
|
if isinstance(options, str):
|
|
# backward compatibility when used againt postgresql < 9.4 and
|
|
# service_options is received as a string.
|
|
try:
|
|
options = json.loads(options)
|
|
except ValueError:
|
|
pass
|
|
if not isinstance(options, dict):
|
|
options = {}
|
|
if request:
|
|
if not automatic_return_url:
|
|
automatic_return_url = self.callback_url
|
|
if automatic_return_url:
|
|
automatic_return_url = request.build_absolute_uri(automatic_return_url)
|
|
if normal_return_url:
|
|
normal_return_url = request.build_absolute_uri(normal_return_url)
|
|
options['normal_return_url'] = normal_return_url
|
|
options['automatic_return_url'] = automatic_return_url
|
|
else:
|
|
assert (
|
|
not automatic_return_url and not normal_return_url
|
|
), 'make_eopayment must be used with a request to set automatic_return_url or normal_return_url'
|
|
options.update(**kwargs)
|
|
return eopayment.Payment(self.service, options)
|
|
|
|
def natural_key(self):
|
|
return (self.slug,)
|
|
|
|
@classmethod
|
|
def export_all_for_json(cls):
|
|
return [x.get_as_serialized_object() for x in cls.objects.all()]
|
|
|
|
def get_as_serialized_object(self):
|
|
serialized_backend = json.loads(serializers.serialize('json', [self], use_natural_primary_keys=True))[
|
|
0
|
|
]
|
|
del serialized_backend['model']
|
|
return serialized_backend
|
|
|
|
@classmethod
|
|
def load_serialized_objects(cls, json_site):
|
|
for json_backend in json_site:
|
|
cls.load_serialized_object(json_backend)
|
|
|
|
@classmethod
|
|
def load_serialized_object(cls, json_backend):
|
|
json_backend['model'] = str(cls._meta)
|
|
try:
|
|
backend = cls.objects.get_by_natural_key(json_backend['fields']['slug'])
|
|
json_backend['pk'] = backend.pk
|
|
except cls.DoesNotExist:
|
|
pass
|
|
|
|
backend = next(serializers.deserialize('json', json.dumps([json_backend]), ignorenonexistent=True))
|
|
backend.save()
|
|
|
|
def handle_backend_response(self, response, callback=True):
|
|
try:
|
|
transaction = Transaction.objects.get(order_id=response.order_id)
|
|
except Transaction.DoesNotExist:
|
|
raise UnknownPaymentException('Received unknown payment response')
|
|
else:
|
|
logger.debug(
|
|
'lingo: backend "%s" received payment response with id %s',
|
|
self,
|
|
response.order_id,
|
|
extra=eopayment_response_to_extra_info(
|
|
response, lingo_transaction_id=transaction.pk, user=transaction.user
|
|
),
|
|
)
|
|
|
|
# check if transaction belong to the right payment backend
|
|
if not transaction.regie.payment_backend == self:
|
|
raise LingoException('Invalid payment backend', transaction=transaction)
|
|
transaction.handle_backend_response(response, callback=callback)
|
|
return transaction
|
|
|
|
def can_poll_backend(self):
|
|
return self.eopayment.has_payment_status
|
|
|
|
def poll_backend(self, min_age=None, max_age=None):
|
|
if not self.can_poll_backend():
|
|
return
|
|
current_time = now()
|
|
# poll transactions linked to the current backend
|
|
# aged between 5 minutes and 3 hours, max_age can be overriden
|
|
min_age = min_age or datetime.timedelta(minutes=5)
|
|
not_after = current_time - min_age
|
|
max_age = max_age or datetime.timedelta(hours=3)
|
|
not_before = current_time - max_age
|
|
transactions = Transaction.objects.filter(
|
|
regie__payment_backend=self,
|
|
start_date__lt=not_after,
|
|
start_date__gt=not_before,
|
|
status__in=Transaction.RUNNING_STATUSES,
|
|
).order_by('pk')
|
|
last_pk = -1
|
|
# skip fast in order to save some resources on database
|
|
if not transactions.exists():
|
|
return
|
|
while True:
|
|
# lock each transaction before trying to poll it
|
|
with atomic():
|
|
transaction = transactions.filter(pk__gt=last_pk).select_for_update(skip_locked=True).first()
|
|
if not transaction:
|
|
break
|
|
last_pk = transaction.pk
|
|
transaction.poll_backend(ignore_errors=False)
|
|
|
|
@property
|
|
def callback_url(self):
|
|
return reverse('lingo-callback-payment-backend', kwargs={'payment_backend_pk': self.slug})
|
|
|
|
|
|
class Regie(models.Model):
|
|
label = models.CharField(verbose_name=_('Label'), max_length=64)
|
|
slug = models.SlugField(
|
|
unique=True, verbose_name=_('Identifier'), help_text=_('The identifier is used in webservice calls.')
|
|
)
|
|
description = models.TextField(verbose_name=_('Description'))
|
|
is_default = models.BooleanField(verbose_name=_('Default Regie'), default=False)
|
|
webservice_url = models.URLField(_('Webservice URL to retrieve remote items'), blank=True)
|
|
extra_fees_ws_url = models.URLField(_('Webservice URL to compute extra fees'), blank=True)
|
|
payment_min_amount = models.DecimalField(
|
|
_('Minimal payment amount'), max_digits=7, decimal_places=2, default=0
|
|
)
|
|
|
|
text_on_success = models.TextField(
|
|
verbose_name=_('Custom text displayed on success'), blank=True, null=True
|
|
)
|
|
payment_backend = models.ForeignKey(
|
|
PaymentBackend, on_delete=models.CASCADE, verbose_name=_('Payment backend')
|
|
)
|
|
transaction_options = JSONField(blank=True, default=dict, verbose_name=_('Transaction Options'))
|
|
can_pay_only_one_basket_item = models.BooleanField(
|
|
default=True, verbose_name=_('Basket items must be paid individually')
|
|
)
|
|
|
|
def is_remote(self):
|
|
return self.webservice_url != ''
|
|
|
|
class Meta:
|
|
verbose_name = _('Regie')
|
|
ordering = (
|
|
'-is_default',
|
|
'label',
|
|
)
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.webservice_url and self.webservice_url.endswith('/'):
|
|
self.webservice_url = self.webservice_url.strip('/')
|
|
if self.is_default:
|
|
qs = self.__class__.objects.filter(is_default=True)
|
|
if self.pk:
|
|
qs = qs.exclude(pk=self.pk)
|
|
qs.update(is_default=False)
|
|
elif not self.__class__.objects.filter(is_default=True).exists():
|
|
self.is_default = True
|
|
super().save(*args, **kwargs)
|
|
|
|
def natural_key(self):
|
|
return (self.slug,)
|
|
|
|
def __str__(self):
|
|
return str(self.label)
|
|
|
|
def get_text_on_success(self):
|
|
if self.text_on_success:
|
|
return self.text_on_success
|
|
return _('Your payment has been succesfully registered.')
|
|
|
|
def get_invoices(self, user, history=False, update_paid=False):
|
|
if not self.is_remote():
|
|
return []
|
|
if user:
|
|
url = self.webservice_url + '/invoices/'
|
|
if history:
|
|
url += 'history/'
|
|
|
|
regie_exc_msg = _('Regie "%(label)s" is unavailable, please retry later.') % {
|
|
'label': self.label,
|
|
}
|
|
|
|
try:
|
|
response = requests.get(url, user=user, remote_service='auto', cache_duration=0)
|
|
response.raise_for_status()
|
|
except RequestException as e:
|
|
raise RegieException(regie_exc_msg) from e
|
|
try:
|
|
items = response.json()
|
|
except ValueError as e:
|
|
raise RegieException(regie_exc_msg) from e
|
|
if items.get('err'):
|
|
raise RegieException(regie_exc_msg)
|
|
if items.get('data'):
|
|
if not isinstance(items['data'], list):
|
|
raise RegieException(regie_exc_msg)
|
|
remote_items = [build_remote_item(item, self) for item in items['data']]
|
|
if not history and update_paid:
|
|
# update paid status using known transactions
|
|
RemoteItem.update_paid(self, remote_items)
|
|
return remote_items
|
|
return []
|
|
return []
|
|
|
|
def get_invoice(self, user, invoice_id, log_errors=True, raise_4xx=False, update_paid=False):
|
|
if not self.is_remote():
|
|
return self.basketitem_set.get(pk=invoice_id)
|
|
url = self.webservice_url + '/invoice/%s/' % invoice_id
|
|
response = requests.get(
|
|
url, user=user, remote_service='auto', cache_duration=0, log_errors=log_errors
|
|
)
|
|
if raise_4xx and 400 <= response.status_code < 500:
|
|
raise ObjectDoesNotExist()
|
|
if response.status_code == 404:
|
|
raise ObjectDoesNotExist()
|
|
response.raise_for_status()
|
|
if response.json().get('err'):
|
|
raise RemoteInvoiceException()
|
|
if response.json().get('data') is None:
|
|
raise ObjectDoesNotExist()
|
|
remote_item = build_remote_item(response.json().get('data'), self)
|
|
if update_paid:
|
|
# update paid status using known transactions
|
|
RemoteItem.update_paid(self, [remote_item])
|
|
return remote_item
|
|
|
|
def get_invoice_pdf(self, user, invoice_id):
|
|
"""
|
|
downloads item's file
|
|
"""
|
|
if self.is_remote() and user:
|
|
url = self.webservice_url + '/invoice/%s/pdf/' % invoice_id
|
|
return requests.get(url, user=user, remote_service='auto', cache_duration=0)
|
|
raise PermissionDenied
|
|
|
|
def pay_invoice(self, invoice_id, transaction_id, transaction_date):
|
|
assert timezone.is_aware(transaction_date), 'transaction_date must be an aware date'
|
|
|
|
url = self.webservice_url + '/invoice/%s/pay/' % invoice_id
|
|
transaction_date = transaction_date.astimezone(utc)
|
|
data = {
|
|
'transaction_id': transaction_id,
|
|
'transaction_date': transaction_date.strftime('%Y-%m-%dT%H:%M:%S'),
|
|
}
|
|
headers = {'content-type': 'application/json'}
|
|
try:
|
|
response = requests.post(url, remote_service='auto', data=json.dumps(data), headers=headers)
|
|
if 400 <= response.status_code < 500:
|
|
raise ObjectDoesNotExist()
|
|
response.raise_for_status()
|
|
except RequestException as e:
|
|
raise RemoteInvoiceException from e
|
|
try:
|
|
resp = response.json()
|
|
except ValueError as e:
|
|
raise RemoteInvoiceException from e
|
|
if resp.get('err'):
|
|
raise RemoteInvoiceException
|
|
return resp
|
|
|
|
def as_api_dict(self):
|
|
return {'id': self.slug, 'text': self.label, 'description': self.description}
|
|
|
|
def compute_extra_fees(self, user):
|
|
if not self.extra_fees_ws_url:
|
|
return
|
|
post_data = {'data': []}
|
|
basketitems = BasketItem.get_items_to_be_paid(user).filter(regie=self)
|
|
for basketitem in basketitems.filter(extra_fee=False):
|
|
basketitem_data = {
|
|
'subject': basketitem.subject,
|
|
'source_url': basketitem.source_url,
|
|
'details': basketitem.details,
|
|
'amount': str(basketitem.amount),
|
|
'request_data': basketitem.request_data,
|
|
}
|
|
post_data['data'].append(basketitem_data)
|
|
if not post_data['data']:
|
|
basketitems.filter(extra_fee=True).delete()
|
|
return
|
|
response = requests.post(
|
|
self.extra_fees_ws_url,
|
|
remote_service='auto',
|
|
data=json.dumps(post_data),
|
|
headers={'content-type': 'application/json'},
|
|
)
|
|
if response.status_code != 200 or response.json().get('err'):
|
|
logger.error('failed to compute extra fees (user: %r)', user)
|
|
return
|
|
basketitems.filter(extra_fee=True).delete()
|
|
for extra_fee in response.json().get('data'):
|
|
BasketItem(
|
|
user=user,
|
|
regie=self,
|
|
subject=extra_fee.get('subject'),
|
|
amount=extra_fee.get('amount'),
|
|
extra_fee=True,
|
|
user_cancellable=False,
|
|
).save()
|
|
|
|
def get_remote_pending_invoices(self):
|
|
if not self.is_remote() or UserSAMLIdentifier is None:
|
|
return {}
|
|
url = self.webservice_url + '/users/with-pending-invoices/'
|
|
response = requests.get(
|
|
url, remote_service='auto', cache_duration=0, log_errors=False, without_user=True
|
|
)
|
|
if not response.ok:
|
|
return {}
|
|
return response.json()['data']
|
|
|
|
def get_notification_namespace(self):
|
|
return 'invoice-%s' % self.slug
|
|
|
|
def get_notification_id(self, invoice):
|
|
return '%s:%s' % (self.get_notification_namespace(), invoice.id)
|
|
|
|
def get_notification_reminder_id(self, invoice):
|
|
return '%s:reminder-%s' % (self.get_notification_namespace(), invoice.id)
|
|
|
|
def notify_invoice(self, user, invoice):
|
|
today = timezone.now().date()
|
|
remind_delta = timezone.timedelta(days=settings.LINGO_NEW_INVOICES_REMIND_DELTA)
|
|
active_items_cell = ActiveItems.objects.first()
|
|
if active_items_cell:
|
|
items_page_url = active_items_cell.page.get_online_url()
|
|
else:
|
|
items_page_url = ''
|
|
notification_id = self.get_notification_id(invoice)
|
|
notification_reminder_id = self.get_notification_reminder_id(invoice)
|
|
if invoice.payment_limit_date < today:
|
|
# invoice is out of date
|
|
Notification.objects.find(user, notification_id).forget()
|
|
Notification.objects.find(user, notification_reminder_id).forget()
|
|
else:
|
|
# invoice can be paid
|
|
if invoice.payment_limit_date >= today + remind_delta:
|
|
message = _('Invoice %s to pay') % invoice.subject
|
|
else:
|
|
message = _('Reminder: invoice %s to pay') % invoice.subject
|
|
notification_id = notification_reminder_id
|
|
if not Notification.objects.find(user, notification_id).exists():
|
|
self.notify_remote_invoice_by_email(user, invoice)
|
|
payment_limit_date = datetime.datetime(
|
|
invoice.payment_limit_date.year,
|
|
invoice.payment_limit_date.month,
|
|
invoice.payment_limit_date.day,
|
|
)
|
|
Notification.notify(
|
|
user,
|
|
summary=message,
|
|
id=notification_id,
|
|
url=items_page_url,
|
|
end_timestamp=make_aware(payment_limit_date),
|
|
)
|
|
return notification_id
|
|
|
|
def notify_new_remote_invoices(self):
|
|
if UserSAMLIdentifier is None:
|
|
# remote invoices retrieval requires SAML
|
|
return
|
|
pending_invoices = self.get_remote_pending_invoices()
|
|
notification_ids = []
|
|
for uuid, items in pending_invoices.items():
|
|
try:
|
|
user = UserSAMLIdentifier.objects.get(name_id=uuid).user
|
|
except UserSAMLIdentifier.DoesNotExist:
|
|
continue
|
|
for invoice in items['invoices']:
|
|
remote_invoice = build_remote_item(invoice, self)
|
|
if remote_invoice.total_amount >= self.payment_min_amount:
|
|
notification_ids.append(self.notify_invoice(user, remote_invoice))
|
|
# clear old notifications for invoice not in the source anymore
|
|
Notification.objects.namespace(self.get_notification_namespace()).exclude(
|
|
external_id__in=notification_ids
|
|
).forget()
|
|
|
|
def notify_remote_invoice_by_email(self, user, invoice):
|
|
|
|
subject_template = 'lingo/combo/invoice_email_notification_subject.txt'
|
|
text_body_template = 'lingo/combo/invoice_email_notification_body.txt'
|
|
html_body_template = 'lingo/combo/invoice_email_notification_body.html'
|
|
|
|
payment_url = reverse('view-item', kwargs={'regie_id': self.id, 'item_crypto_id': invoice.crypto_id})
|
|
ctx = settings.TEMPLATE_VARS.copy()
|
|
ctx['invoice'] = invoice
|
|
ctx['payment_url'] = urllib.parse.urljoin(settings.SITE_BASE_URL, payment_url)
|
|
ctx['portal_url'] = settings.SITE_BASE_URL
|
|
subject = render_to_string([subject_template], ctx).strip()
|
|
text_body = render_to_string([text_body_template], ctx)
|
|
html_body = render_to_string([html_body_template], ctx)
|
|
message = EmailMultiAlternatives(subject, text_body, to=[user.email])
|
|
message.attach_alternative(html_body, 'text/html')
|
|
if invoice.has_pdf:
|
|
invoice_pdf = self.get_invoice_pdf(user, invoice.id)
|
|
message.attach('%s.pdf' % invoice.id, invoice_pdf.content, 'application/pdf')
|
|
message.send()
|
|
|
|
@classmethod
|
|
def export_all_for_json(cls):
|
|
return [x.get_as_serialized_object() for x in cls.objects.all()]
|
|
|
|
def get_as_serialized_object(self):
|
|
serialized_regie = serializers.serialize(
|
|
'json', [self], use_natural_primary_keys=True, use_natural_foreign_keys=True
|
|
)
|
|
serialized_regie = json.loads(serialized_regie)[0]
|
|
del serialized_regie['model']
|
|
return serialized_regie
|
|
|
|
@classmethod
|
|
def load_serialized_objects(cls, json_site):
|
|
for json_regie in json_site:
|
|
cls.load_serialized_object(json_regie)
|
|
|
|
@classmethod
|
|
def load_serialized_object(cls, json_regie):
|
|
json_regie['model'] = str(cls._meta)
|
|
try:
|
|
regie = cls.objects.get(slug=json_regie['fields']['slug'])
|
|
json_regie['pk'] = regie.pk
|
|
except cls.DoesNotExist:
|
|
pass
|
|
|
|
regie = next(serializers.deserialize('json', json.dumps([json_regie]), ignorenonexistent=True))
|
|
regie.save()
|
|
|
|
def can_poll_backend(self):
|
|
return self.payment_backend.can_poll_backend()
|
|
|
|
@property
|
|
def eopayment(self):
|
|
return self.make_eopayment()
|
|
|
|
def make_eopayment(self, **kwargs):
|
|
return self.payment_backend.make_eopayment(**kwargs)
|
|
|
|
@property
|
|
def callback_url(self):
|
|
return self.payment_backend.callback_url
|
|
|
|
|
|
class BasketItem(models.Model):
|
|
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True)
|
|
regie = models.ForeignKey(Regie, on_delete=models.CASCADE)
|
|
subject = models.CharField(verbose_name=_('Subject'), max_length=200)
|
|
source_url = models.URLField(_('Source URL'), blank=True)
|
|
details = models.TextField(verbose_name=_('Details'), blank=True)
|
|
amount = models.DecimalField(verbose_name=_('Amount'), decimal_places=2, max_digits=8)
|
|
request_data = JSONField(blank=True, default=dict)
|
|
extra_fee = models.BooleanField(default=False)
|
|
user_cancellable = models.BooleanField(default=True)
|
|
creation_date = models.DateTimeField(auto_now_add=True)
|
|
cancellation_date = models.DateTimeField(null=True)
|
|
waiting_date = models.DateTimeField(null=True)
|
|
payment_date = models.DateTimeField(null=True)
|
|
notification_date = models.DateTimeField(null=True)
|
|
capture_date = models.DateField(null=True)
|
|
email = models.EmailField(null=True)
|
|
reference_id = models.CharField(max_length=200)
|
|
remote_item_id = models.TextField(null=True, unique=True)
|
|
|
|
class Meta:
|
|
ordering = ['regie', 'extra_fee', 'subject']
|
|
|
|
@classmethod
|
|
def get_items_to_be_paid(cls, user, poll=False, raise_on_poll=False):
|
|
qs = cls.objects.filter(
|
|
user=user, payment_date__isnull=True, waiting_date__isnull=True, cancellation_date__isnull=True
|
|
)
|
|
if poll:
|
|
for transaction in Transaction.objects.filter(items__in=qs):
|
|
if transaction.can_poll_backend():
|
|
if raise_on_poll:
|
|
raise NothingInCacheException
|
|
transaction.poll_backend()
|
|
return qs
|
|
|
|
def is_notifiable(self, status='paid'):
|
|
if not self.source_url:
|
|
return True
|
|
url = self.source_url + 'jump/trigger/%s' % status
|
|
# make a GET request to trigger, it will not actually trigger it (it requires a GET)
|
|
# but it will tell us if the trigger doesn't exist (by returning a 404, vs 403 access
|
|
# denied).
|
|
req = requests.get(url, remote_service='auto', timeout=5, log_errors=False)
|
|
if req.status_code == 404:
|
|
return False
|
|
return True
|
|
|
|
def notify(self, status):
|
|
if not self.source_url:
|
|
return
|
|
url = self.source_url + 'jump/trigger/%s' % status
|
|
message = {'result': 'ok'}
|
|
if status == 'paid':
|
|
transaction = self.transaction_set.filter(status__in=(eopayment.ACCEPTED, eopayment.PAID))[0]
|
|
message['transaction_id'] = transaction.id
|
|
message['order_id'] = transaction.order_id
|
|
message['bank_transaction_id'] = transaction.bank_transaction_id
|
|
bank_transaction_date = transaction.bank_transaction_date or transaction.end_date
|
|
bank_transaction_date = bank_transaction_date.astimezone(utc)
|
|
message['bank_transaction_date'] = bank_transaction_date.strftime('%Y-%m-%dT%H:%M:%S')
|
|
message['bank_data'] = transaction.bank_data
|
|
headers = {'content-type': 'application/json'}
|
|
r = requests.post(url, remote_service='auto', data=json.dumps(message), headers=headers, timeout=15)
|
|
r.raise_for_status()
|
|
|
|
def notify_payment(self, notify_origin=True):
|
|
if notify_origin:
|
|
self.notify('paid')
|
|
self.notification_date = timezone.now()
|
|
self.save()
|
|
self.regie.compute_extra_fees(user=self.user)
|
|
|
|
def notify_cancellation(self, notify_origin=False):
|
|
if notify_origin:
|
|
self.notify('cancelled')
|
|
self.cancellation_date = timezone.now()
|
|
self.save()
|
|
self.regie.compute_extra_fees(user=self.user)
|
|
|
|
@property
|
|
def total_amount(self):
|
|
return self.amount
|
|
|
|
@property
|
|
def payment_url(self):
|
|
signature = signing_dumps(self.pk)
|
|
return reverse('basket-item-pay-view', kwargs={'item_signature': signature})
|
|
|
|
|
|
class RemoteItem:
|
|
payment_date = None
|
|
|
|
def __init__(
|
|
self,
|
|
id,
|
|
regie,
|
|
creation_date,
|
|
payment_limit_date,
|
|
total_amount,
|
|
amount,
|
|
amount_paid,
|
|
display_id,
|
|
subject,
|
|
has_pdf,
|
|
online_payment,
|
|
paid,
|
|
payment_date,
|
|
no_online_payment_reason,
|
|
reference_id,
|
|
):
|
|
self.id = id
|
|
self.regie = regie
|
|
self.creation_date = dateparse.parse_date(creation_date or '')
|
|
self.payment_limit_date = dateparse.parse_date(payment_limit_date or '')
|
|
self.total_amount = Decimal(total_amount)
|
|
self.amount = Decimal(amount)
|
|
if amount_paid:
|
|
self.amount_paid = Decimal(amount_paid)
|
|
self.display_id = display_id or self.id
|
|
self.subject = subject
|
|
self.has_pdf = has_pdf
|
|
self.online_payment = online_payment
|
|
self.paid = paid
|
|
self.no_online_payment_reason = no_online_payment_reason
|
|
self.reference_id = reference_id
|
|
if payment_date:
|
|
self.payment_date = parser.parse(payment_date)
|
|
self.waiting_date = None
|
|
|
|
@property
|
|
def no_online_payment_reason_details(self):
|
|
reasons = {
|
|
'litigation': _('This invoice is in litigation.'),
|
|
'autobilling': _('Autobilling has been set for this invoice.'),
|
|
'past-due-date': _('Due date is over.'),
|
|
}
|
|
return settings.LINGO_NO_ONLINE_PAYMENT_REASONS.get(
|
|
self.no_online_payment_reason, reasons.get(self.no_online_payment_reason)
|
|
)
|
|
|
|
@property
|
|
def crypto_id(self):
|
|
return aes_hex_encrypt(settings.SECRET_KEY, force_bytes(str(self.id)))
|
|
|
|
@classmethod
|
|
def transactions_for_remote_items(cls, queryset, remote_items):
|
|
remote_item_ids = {remote_item.id for remote_item in remote_items if not remote_item.paid}
|
|
if not remote_item_ids:
|
|
return Transaction.objects.none()
|
|
|
|
# filter transactions by regie, status and contained remote_item id
|
|
query = reduce(
|
|
models.Q.__or__,
|
|
(models.Q(remote_items__contains=remote_item_id) for remote_item_id in remote_item_ids),
|
|
)
|
|
|
|
# accumulate in paid_items each remote_item earliest payment_date
|
|
for transaction in queryset.filter(query):
|
|
for remote_item_id in transaction.remote_items.split(','):
|
|
if remote_item_id in remote_item_ids:
|
|
yield transaction
|
|
break
|
|
|
|
@classmethod
|
|
def update_paid(cls, regie, remote_items):
|
|
paid_items = {}
|
|
waiting_items = {}
|
|
transaction_qs = Transaction.objects.filter(regie=regie)
|
|
|
|
can_poll_backend = regie.can_poll_backend()
|
|
|
|
# accumulate in paid_items each remote_item earliest payment_date
|
|
for transaction in cls.transactions_for_remote_items(transaction_qs, remote_items):
|
|
if transaction.is_running() and can_poll_backend:
|
|
transaction.poll_backend()
|
|
for remote_item in transaction.remote_items.split(','):
|
|
if transaction.end_date and transaction.is_paid():
|
|
if remote_item not in paid_items:
|
|
paid_items[remote_item] = transaction.end_date
|
|
else:
|
|
paid_items[remote_item] = min(transaction.end_date, paid_items[remote_item])
|
|
elif transaction.status == eopayment.WAITING and can_poll_backend:
|
|
waiting_items[remote_item] = transaction.start_date
|
|
|
|
# update remote_item.paid using paid_items
|
|
for remote_item in remote_items:
|
|
if remote_item.paid:
|
|
continue
|
|
if remote_item.id in paid_items:
|
|
remote_item.paid = True
|
|
remote_item.payment_date = paid_items[remote_item.id]
|
|
elif remote_item.id in waiting_items:
|
|
remote_item.waiting_date = waiting_items[remote_item.id]
|
|
|
|
|
|
def status_label(status):
|
|
return {
|
|
0: _('Running'),
|
|
eopayment.WAITING: _('Running'),
|
|
eopayment.PAID: _('Paid'),
|
|
eopayment.ACCEPTED: _('Paid (accepted)'),
|
|
eopayment.CANCELLED: _('Cancelled'),
|
|
EXPIRED: _('Expired'),
|
|
}.get(status) or _('Unknown')
|
|
|
|
|
|
class Transaction(models.Model):
|
|
regie = models.ForeignKey(Regie, on_delete=models.CASCADE, null=True)
|
|
items = models.ManyToManyField(BasketItem, blank=True)
|
|
remote_items = models.CharField(max_length=512)
|
|
to_be_paid_remote_items = models.CharField(max_length=512, null=True)
|
|
start_date = models.DateTimeField(auto_now_add=True, db_index=True)
|
|
end_date = models.DateTimeField(null=True)
|
|
bank_data = JSONField(blank=True, default=dict)
|
|
order_id = models.CharField(max_length=200)
|
|
bank_transaction_id = models.CharField(max_length=200, null=True)
|
|
bank_transaction_date = models.DateTimeField(blank=True, null=True)
|
|
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True)
|
|
status = models.IntegerField(null=True)
|
|
amount = models.DecimalField(default=0, max_digits=7, decimal_places=2)
|
|
|
|
RUNNING_STATUSES = [0, eopayment.WAITING, eopayment.RECEIVED]
|
|
PAID_STATUSES = [eopayment.PAID, eopayment.ACCEPTED]
|
|
|
|
def is_remote(self):
|
|
return self.remote_items != ''
|
|
|
|
def get_user_name(self):
|
|
if self.user:
|
|
return self.user.get_full_name()
|
|
return _('Anonymous User')
|
|
|
|
def is_paid(self):
|
|
return self.status in self.PAID_STATUSES
|
|
|
|
def is_running(self):
|
|
return self.status in self.RUNNING_STATUSES
|
|
|
|
def get_status_label(self):
|
|
return status_label(self.status)
|
|
|
|
def first_notify_remote_items_of_payments(self):
|
|
self.notify_remote_items_of_payments(self.remote_items)
|
|
|
|
def retry_notify_remote_items_of_payments(self):
|
|
self.notify_remote_items_of_payments(self.to_be_paid_remote_items)
|
|
|
|
def notify_remote_items_of_payments(self, items):
|
|
if not items:
|
|
return
|
|
if not self.is_paid():
|
|
return
|
|
|
|
regie = self.regie
|
|
to_be_paid_remote_items = []
|
|
for item_id in items.split(','):
|
|
try:
|
|
remote_item = regie.get_invoice(user=self.user, invoice_id=item_id, raise_4xx=True)
|
|
with atomic(savepoint=False):
|
|
self.items.add(self.create_paid_invoice_basket_item(item_id, remote_item))
|
|
regie.pay_invoice(item_id, self.order_id, self.bank_transaction_date or self.end_date)
|
|
except ObjectDoesNotExist:
|
|
# 4xx error
|
|
logger.error(
|
|
'unable to retrieve or pay remote item %s from transaction %s, ignore it', item_id, self
|
|
)
|
|
except (RequestException, RemoteInvoiceException):
|
|
# 5xx, err or requests error
|
|
to_be_paid_remote_items.append(item_id)
|
|
logger.warning(
|
|
'unable to notify payment for remote item %s from transaction %s, retry later',
|
|
item_id,
|
|
self,
|
|
)
|
|
except Exception:
|
|
# unknown error
|
|
to_be_paid_remote_items.append(item_id)
|
|
logger.exception(
|
|
'unable to notify payment for remote item %s from transaction %s', item_id, self
|
|
)
|
|
else:
|
|
logger.info('notified payment for remote item %s from transaction %s', item_id, self)
|
|
|
|
self.to_be_paid_remote_items = ','.join(to_be_paid_remote_items) or None
|
|
self.save(update_fields=['to_be_paid_remote_items'])
|
|
|
|
def create_paid_invoice_basket_item(self, item_id, remote_item):
|
|
subject = _('Invoice #%s') % remote_item.display_id
|
|
basket_item, dummy = BasketItem.objects.get_or_create(
|
|
remote_item_id=item_id,
|
|
defaults={
|
|
'user': self.user,
|
|
'regie': remote_item.regie,
|
|
'source_url': '',
|
|
'subject': subject,
|
|
'amount': remote_item.amount,
|
|
'payment_date': self.end_date,
|
|
},
|
|
)
|
|
return basket_item
|
|
|
|
def handle_backend_response(self, response, callback=True):
|
|
logger.debug('lingo: regie "%s" handling response for transaction "%s"', self.regie, self.order_id)
|
|
if self.status == response.result:
|
|
# return early if self status didn't change (it means the
|
|
# payment service sent the response both as server to server and
|
|
# via the user browser and we already handled one).
|
|
return
|
|
|
|
if not self.is_running():
|
|
logger.info(
|
|
'lingo: regie "%s" received payment notification on existing '
|
|
'transaction, status changed, "%s" (%s) -> "%s" (%s)',
|
|
self.regie,
|
|
status_label(self.status),
|
|
self.status,
|
|
status_label(response.result),
|
|
response.result,
|
|
)
|
|
|
|
if not response.signed and not response.result == eopayment.CANCELLED:
|
|
raise UnsignedPaymentException('Received unsigned payment response', transaction=self)
|
|
self.status = response.result
|
|
self.bank_transaction_id = response.transaction_id
|
|
self.bank_data = response.bank_data
|
|
self.end_date = timezone.now()
|
|
# store transaction_date but prevent multiple updates
|
|
if response.transaction_date is None:
|
|
logger.warning('lingo: no transaction date')
|
|
elif self.bank_transaction_date is None:
|
|
self.bank_transaction_date = response.transaction_date
|
|
elif response.transaction_date != self.bank_transaction_date:
|
|
# XXX: don't know if it can happen, but we would like to know when it does
|
|
# as for differed payments there can be multiple notifications.
|
|
logger.error(
|
|
'lingo: regie "%s" new transaction_date for transaction %s(%s) was %s, received %s',
|
|
self.regie,
|
|
self.order_id,
|
|
self.id,
|
|
self.bank_transaction_date,
|
|
response.transaction_date,
|
|
)
|
|
|
|
self.save()
|
|
|
|
if response.result == eopayment.WAITING:
|
|
# mark basket items as waiting for payment confirmation
|
|
self.items.all().update(waiting_date=timezone.now())
|
|
return
|
|
|
|
if response.result == eopayment.CANCELLED:
|
|
# mark basket items as no longer waiting so the user can restart a
|
|
# payment.
|
|
self.items.all().update(waiting_date=None)
|
|
return
|
|
|
|
if response.result not in (eopayment.PAID, eopayment.ACCEPTED):
|
|
return
|
|
|
|
self.items.update(payment_date=self.end_date)
|
|
|
|
for item in self.items.all():
|
|
try:
|
|
item.notify_payment()
|
|
except Exception as e:
|
|
# ignore errors, it will be retried later on if it fails
|
|
logger.warning(
|
|
'lingo: regie "%s" error in sync notification for basket item %s '
|
|
'and transaction %s, %s',
|
|
self.regie,
|
|
item.id,
|
|
self.order_id,
|
|
e,
|
|
)
|
|
|
|
if self.remote_items:
|
|
self.first_notify_remote_items_of_payments()
|
|
|
|
@property
|
|
def eopayment(self):
|
|
return self.regie.eopayment
|
|
|
|
def make_eopayment(self, **kwargs):
|
|
normal_return_url = reverse(
|
|
'lingo-return-payment-backend',
|
|
kwargs={
|
|
'payment_backend_pk': self.regie.payment_backend.id,
|
|
'transaction_signature': signing_dumps(self.pk),
|
|
},
|
|
)
|
|
return self.regie.make_eopayment(normal_return_url=normal_return_url, **kwargs)
|
|
|
|
def can_poll_backend(self):
|
|
return self.regie and self.regie.can_poll_backend()
|
|
|
|
def poll_backend(self, ignore_errors=True):
|
|
with atomic():
|
|
# lock the transaction
|
|
Transaction.objects.filter(pk=self.pk).select_for_update().first()
|
|
try:
|
|
response = self.eopayment.payment_status(self.order_id, transaction_date=self.start_date)
|
|
except eopayment.PaymentException:
|
|
if ignore_errors:
|
|
logger.warning(
|
|
'lingo: regie "%s" polling backend for transaction "%s(%s)" failed',
|
|
self.regie,
|
|
self.order_id,
|
|
self.id,
|
|
exc_info=True,
|
|
)
|
|
return
|
|
raise LingoException('polling failed', transaction=self)
|
|
|
|
logger.debug(
|
|
'lingo: regie "%s" polling backend for transaction "%s(%s)"',
|
|
self.regie,
|
|
self.order_id,
|
|
self.id,
|
|
)
|
|
|
|
if self.status != response.result:
|
|
self.handle_backend_response(response)
|
|
|
|
def __str__(self):
|
|
return f'transaction "{self.order_id}" of regie "{self.regie}"'
|
|
|
|
|
|
class TransactionOperation(models.Model):
|
|
OPERATIONS = [
|
|
('validation', _('Validation')),
|
|
('cancellation', _('Cancellation')),
|
|
]
|
|
transaction = models.ForeignKey(Transaction, on_delete=models.CASCADE)
|
|
kind = models.CharField(max_length=65, choices=OPERATIONS)
|
|
amount = models.DecimalField(decimal_places=2, max_digits=8)
|
|
creation_date = models.DateTimeField(auto_now_add=True)
|
|
bank_result = JSONField(blank=True, default=dict)
|
|
|
|
|
|
@register_cell_class
|
|
class LingoBasketCell(CellBase):
|
|
user_dependant = True
|
|
|
|
class Meta:
|
|
verbose_name = _('Basket')
|
|
|
|
class Media:
|
|
js = (
|
|
'xstatic/jquery-ui.min.js',
|
|
'js/gadjo.js',
|
|
)
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
return Regie.objects.exists()
|
|
|
|
def is_relevant(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated):
|
|
return False
|
|
return BasketItem.get_items_to_be_paid(context['request'].user).exists()
|
|
|
|
def get_badge(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated):
|
|
return
|
|
items = BasketItem.get_items_to_be_paid(context['request'].user)
|
|
if not items:
|
|
return
|
|
total = sum(x.amount for x in items)
|
|
if total == int(total):
|
|
total = int(total)
|
|
return {'badge': _('%s€') % localize(total)}
|
|
|
|
def render(self, context):
|
|
basket_template = template.loader.get_template('lingo/combo/basket.html')
|
|
items = BasketItem.get_items_to_be_paid(
|
|
context['request'].user, poll=True, raise_on_poll=not context.get('synchronous')
|
|
)
|
|
regies = {}
|
|
for item in items:
|
|
if not item.regie_id in regies:
|
|
regies[item.regie_id] = {'items': [], 'regie': item.regie}
|
|
regies[item.regie_id]['items'].append(item)
|
|
|
|
for items in regies.values():
|
|
items['total'] = sum(x.amount for x in items['items'])
|
|
|
|
context['regies'] = sorted(regies.values(), key=lambda x: x['regie'].label)
|
|
return basket_template.render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class LingoRecentTransactionsCell(CellBase):
|
|
user_dependant = True
|
|
|
|
class Meta:
|
|
verbose_name = _('Recent Transactions')
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
return Regie.objects.exists()
|
|
|
|
def get_transactions_queryset(self, context, poll=False):
|
|
user = context['request'].user
|
|
# list transactions :
|
|
# * paid by the user
|
|
# * or linked to a BasketItem of the user
|
|
qs_recent_txn = Transaction.objects.filter(
|
|
start_date__gte=timezone.now() - datetime.timedelta(days=7)
|
|
)
|
|
qs = qs_recent_txn.filter(user=user).union(qs_recent_txn.filter(items__user=user))
|
|
if poll:
|
|
for transaction in qs:
|
|
if transaction.can_poll_backend() and transaction.is_running():
|
|
if not context.get('synchronous'):
|
|
raise NothingInCacheException
|
|
transaction.poll_backend()
|
|
return qs
|
|
|
|
def is_relevant(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated):
|
|
return False
|
|
return self.get_transactions_queryset(context).exists()
|
|
|
|
def render(self, context):
|
|
recent_transactions_template = template.loader.get_template('lingo/combo/recent_transactions.html')
|
|
context['transactions'] = self.get_transactions_queryset(context, poll=True).order_by('-start_date')
|
|
return recent_transactions_template.render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class LingoBasketLinkCell(CellBase):
|
|
user_dependant = True
|
|
|
|
class Meta:
|
|
verbose_name = _('Basket Link')
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
return Regie.objects.exists()
|
|
|
|
def is_relevant(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated):
|
|
return False
|
|
return BasketItem.get_items_to_be_paid(context['request'].user).exists()
|
|
|
|
def render(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated):
|
|
return ''
|
|
try:
|
|
context['basket_url'] = LingoBasketCell.objects.all()[0].page.get_online_url()
|
|
except IndexError:
|
|
return ''
|
|
basket_template = template.loader.get_template('lingo/combo/basket_link.html')
|
|
context['items'] = BasketItem.get_items_to_be_paid(user=context['request'].user)
|
|
context['total'] = sum(x.amount for x in context['items'])
|
|
return basket_template.render(context)
|
|
|
|
|
|
class Items(CellBase):
|
|
regie = models.CharField(_('Regie'), max_length=50, blank=True)
|
|
title = models.CharField(_('Title'), max_length=200, blank=True)
|
|
text = RichTextField(_('Text'), blank=True, null=True)
|
|
|
|
user_dependant = True
|
|
default_template_name = 'lingo/combo/items.html'
|
|
loading_message = _('Loading invoices...')
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
class Media:
|
|
js = (
|
|
'xstatic/jquery-ui.min.js',
|
|
'js/gadjo.js',
|
|
)
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
return Regie.objects.exclude(webservice_url='').exists()
|
|
|
|
def is_relevant(self, context):
|
|
return getattr(context['request'], 'user', None) and context['request'].user.is_authenticated
|
|
|
|
def get_default_form_class(self):
|
|
fields = ['text']
|
|
if hasattr(self, 'hide_if_empty'):
|
|
fields.append('hide_if_empty')
|
|
widgets = {}
|
|
regie_qs = Regie.objects.exclude(webservice_url='')
|
|
if len(regie_qs) > 1:
|
|
regies = [('', _('All'))]
|
|
regies.extend([(r.slug, r.label) for r in regie_qs])
|
|
widgets['regie'] = Select(choices=regies)
|
|
fields.insert(0, 'regie')
|
|
return model_forms.modelform_factory(self.__class__, fields=fields, widgets=widgets)
|
|
|
|
def get_regies(self):
|
|
if self.regie:
|
|
return [Regie.objects.get(slug=self.regie)]
|
|
return Regie.objects.all()
|
|
|
|
def get_invoices(self, user):
|
|
return [], []
|
|
|
|
def get_cell_extra_context(self, context):
|
|
ctx = super().get_cell_extra_context(context)
|
|
if context.get('placeholder_search_mode'):
|
|
# don't call webservices when we're just looking for placeholders
|
|
return ctx
|
|
ctx.update({'title': self.title, 'text': self.text})
|
|
items, errors = self.get_invoices(user=context['user'])
|
|
none_date = datetime.datetime(1900, 1, 1) # to avoid None-None comparison errors
|
|
items.sort(key=lambda i: i.creation_date or none_date, reverse=True)
|
|
ctx.update(
|
|
{
|
|
'items': items,
|
|
'errors': errors,
|
|
'with_payment_limit_date': any(i.payment_limit_date for i in items),
|
|
'with_amount_paid': any(getattr(i, 'amount_paid', None) for i in items),
|
|
}
|
|
)
|
|
return ctx
|
|
|
|
def render(self, context):
|
|
self.context = context
|
|
if not context.get('synchronous'):
|
|
raise NothingInCacheException()
|
|
return super().render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class ItemsHistory(Items):
|
|
class Meta:
|
|
verbose_name = _('Items History Cell')
|
|
|
|
def get_invoices(self, user):
|
|
items = []
|
|
errors = []
|
|
for r in self.get_regies():
|
|
try:
|
|
items.extend(r.get_invoices(user, history=True))
|
|
except RegieException as e:
|
|
errors.append(e)
|
|
return items, errors
|
|
|
|
|
|
@register_cell_class
|
|
class ActiveItems(Items):
|
|
hide_if_empty = models.BooleanField(_('Hide if no items'), default=False)
|
|
|
|
class Meta:
|
|
verbose_name = _('Active Items Cell')
|
|
|
|
def get_invoices(self, user):
|
|
items = []
|
|
errors = []
|
|
for r in self.get_regies():
|
|
try:
|
|
for remote_item in r.get_invoices(user, update_paid=True):
|
|
if not remote_item.paid:
|
|
items.append(remote_item)
|
|
except RegieException as e:
|
|
errors.append(e)
|
|
return items, errors
|
|
|
|
|
|
@register_cell_class
|
|
class SelfDeclaredInvoicePayment(Items):
|
|
user_dependant = False
|
|
default_template_name = 'lingo/combo/self-declared-invoice-payment.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Self declared invoice payment')
|
|
|
|
def is_relevant(self, context):
|
|
return self.is_enabled()
|
|
|
|
def render(self, context):
|
|
context['synchronous'] = True
|
|
context['page_path'] = context['request'].path
|
|
return super().render(context)
|
|
|
|
|
|
TIPI_CONTROL_PROCOTOLS = (
|
|
('pesv2', _('Indigo/PES v2')),
|
|
('rolmre', _('ROLMRE')),
|
|
)
|
|
|
|
|
|
@register_cell_class
|
|
class TipiPaymentFormCell(CellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
regies = models.CharField(
|
|
_('Regies'),
|
|
help_text=_(
|
|
'Values separated by commas. It is possible to add a label after a regie identifier. '
|
|
'Example: "1234 - Regie A,5678 - Regie B"'
|
|
),
|
|
max_length=256,
|
|
)
|
|
control_protocol = models.CharField(
|
|
_('Control protocol'), max_length=8, choices=TIPI_CONTROL_PROCOTOLS, default='pesv2'
|
|
)
|
|
exer = models.CharField('Exer', max_length=4, blank=True, help_text=_('Default value to be used in form'))
|
|
idpce = models.CharField(
|
|
'IDPCE', max_length=8, blank=True, help_text=_('Default value to be used in form')
|
|
)
|
|
idligne = models.CharField(
|
|
'IDLIGNE', max_length=6, blank=True, help_text=_('Default value to be used in form')
|
|
)
|
|
rolrec = models.CharField(
|
|
'ROLREC', max_length=2, blank=True, help_text=_('Default value to be used in form')
|
|
)
|
|
roldeb = models.CharField(
|
|
'ROLDEB', max_length=2, blank=True, help_text=_('Default value to be used in form')
|
|
)
|
|
roldet = models.CharField(
|
|
'ROLDET', max_length=13, blank=True, help_text=_('Default value to be used in form')
|
|
)
|
|
test_mode = models.BooleanField(_('Test mode'), default=False)
|
|
default_template_name = 'lingo/tipi_form.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('TIPI Payment Form')
|
|
|
|
class Media:
|
|
js = ('js/tipi.js',)
|
|
|
|
@property
|
|
def url(self):
|
|
return getattr(settings, 'LINGO_TIPI_CELL_PAYMENT_URL', 'https://www.payfip.gouv.fr/tpa/paiement.web')
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
form_fields = self.get_default_form_class().base_fields
|
|
field_definitions = (
|
|
{'protocol': 'any', 'fields': ['exer']},
|
|
{'protocol': 'pesv2', 'fields': ['idpce', 'idligne']},
|
|
{'protocol': 'rolmre', 'fields': ['rolrec', 'roldeb', 'roldet']},
|
|
)
|
|
reference_fields = []
|
|
for definition in field_definitions:
|
|
for field in definition['fields']:
|
|
field_pattern = '[0-9]+'
|
|
# special pattern for rolrec
|
|
if field == 'rolrec':
|
|
field_pattern = '[A-Z0-9]+'
|
|
reference_fields.append(
|
|
{
|
|
'name': field,
|
|
'length': form_fields[field].max_length,
|
|
'placeholder': '0' * form_fields[field].max_length,
|
|
'pattern': field_pattern,
|
|
'protocol': definition['protocol'],
|
|
}
|
|
)
|
|
context['title'] = self.title
|
|
context['url'] = self.url
|
|
context['mode'] = 'T' if self.test_mode else 'M'
|
|
context['control_protocol'] = self.control_protocol
|
|
context['regies'] = []
|
|
context['pesv2'] = self.control_protocol == 'pesv2'
|
|
for field in reference_fields:
|
|
if getattr(self, field['name']):
|
|
field['default'] = getattr(self, field['name'])
|
|
context['reference_fields'] = reference_fields
|
|
for regie in self.regies.split(','):
|
|
regie = regie.strip()
|
|
id_search = re.search(r'(\d+)', regie)
|
|
if not id_search:
|
|
continue
|
|
regie_id = id_search.group(1)
|
|
context['regies'].append((regie_id, regie))
|
|
return extra_context
|