778 lines
31 KiB
Python
778 lines
31 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# lingo - basket and payment system
|
|
# Copyright (C) 2015 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import json
|
|
import logging
|
|
|
|
from decimal import Decimal
|
|
|
|
from dateutil import parser
|
|
import eopayment
|
|
from jsonfield import JSONField
|
|
|
|
from django import template
|
|
from django.conf import settings
|
|
from django.db import models
|
|
from django.forms import models as model_forms, Select
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.utils import timezone, dateparse
|
|
from django.core.mail import EmailMultiAlternatives
|
|
from django.core.urlresolvers import reverse
|
|
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
|
|
from django.utils.encoding import python_2_unicode_compatible
|
|
from django.utils.formats import localize
|
|
from django.utils.http import urlencode
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
|
|
from django.contrib.auth.models import User
|
|
from django.template.loader import render_to_string
|
|
|
|
from combo.data.fields import RichTextField
|
|
from combo.data.models import CellBase
|
|
from combo.data.library import register_cell_class
|
|
from combo.utils import NothingInCacheException, aes_hex_encrypt, requests
|
|
from combo.apps.notifications.models import Notification
|
|
|
|
try:
|
|
from mellon.models import UserSAMLIdentifier
|
|
except ImportError:
|
|
UserSAMLIdentifier = None
|
|
|
|
|
|
EXPIRED = 9999
|
|
|
|
|
|
SERVICES = [
|
|
(eopayment.DUMMY, _('Dummy (for tests)')),
|
|
(eopayment.SYSTEMPAY, 'systempay (Banque Populaire)'),
|
|
(eopayment.SIPS, _('SIPS (Atos, France)')),
|
|
(eopayment.SIPS2, _('SIPS (Atos, other countries)')),
|
|
(eopayment.SPPLUS, _('SP+ (Caisse d\'epargne)')),
|
|
(eopayment.OGONE, _('Ingenico (formerly Ogone)')),
|
|
(eopayment.PAYBOX, _('Paybox')),
|
|
(eopayment.PAYZEN, _('PayZen')),
|
|
(eopayment.TIPI, _('TIPI')),
|
|
]
|
|
|
|
def build_remote_item(data, regie):
|
|
return RemoteItem(id=data.get('id'), regie=regie,
|
|
creation_date=data['created'],
|
|
payment_limit_date=data['pay_limit_date'],
|
|
display_id=data.get('display_id'),
|
|
total_amount=data.get('total_amount'),
|
|
amount=data.get('amount'),
|
|
subject=data.get('label'),
|
|
has_pdf=data.get('has_pdf'),
|
|
online_payment=data.get('online_payment'),
|
|
paid=data.get('paid'),
|
|
payment_date=data.get('payment_date'),
|
|
no_online_payment_reason=data.get('no_online_payment_reason'))
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class PaymentBackend(models.Model):
|
|
label = models.CharField(verbose_name=_('Label'), max_length=64)
|
|
slug = models.SlugField(
|
|
unique=True, verbose_name=_('Identifier'),
|
|
help_text=_('The identifier is used in webservice calls.'))
|
|
service = models.CharField(
|
|
verbose_name=_('Payment Service'), max_length=64, choices=SERVICES)
|
|
service_options = JSONField(blank=True, verbose_name=_('Payment Service Options'))
|
|
|
|
def __str__(self):
|
|
return self.label
|
|
|
|
def get_payment(self):
|
|
return eopayment.Payment(self.service, self.service_options)
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class Regie(models.Model):
|
|
label = models.CharField(verbose_name=_('Label'), max_length=64)
|
|
slug = models.SlugField(unique=True, verbose_name=_('Identifier'),
|
|
help_text=_('The identifier is used in webservice calls.'))
|
|
description = models.TextField(verbose_name=_('Description'))
|
|
is_default = models.BooleanField(verbose_name=_('Default Regie'), default=False)
|
|
webservice_url = models.URLField(_('Webservice URL to retrieve remote items'),
|
|
blank=True)
|
|
extra_fees_ws_url = models.URLField(_('Webservice URL to compute extra fees'),
|
|
blank=True)
|
|
payment_min_amount = models.DecimalField(_('Minimal payment amount'),
|
|
max_digits=7, decimal_places=2, default=0)
|
|
|
|
text_on_success = models.TextField(
|
|
verbose_name=_('Custom text displayed on success'),
|
|
blank=True, null=True)
|
|
payment_backend = models.ForeignKey(PaymentBackend, on_delete=models.CASCADE)
|
|
transaction_options = JSONField(blank=True, verbose_name=_('Transaction Options'))
|
|
|
|
def is_remote(self):
|
|
return self.webservice_url != ''
|
|
|
|
class Meta:
|
|
verbose_name = _('Regie')
|
|
ordering = ('-is_default', 'label',)
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.webservice_url and self.webservice_url.endswith('/'):
|
|
self.webservice_url = self.webservice_url.strip('/')
|
|
if self.is_default:
|
|
qs = self.__class__.objects.filter(is_default=True)
|
|
if self.pk:
|
|
qs = qs.exclude(pk=self.pk)
|
|
qs.update(is_default=False)
|
|
elif self.__class__.objects.filter(is_default=True).count() == 0:
|
|
self.is_default = True
|
|
super(Regie, self).save(*args, **kwargs)
|
|
|
|
def natural_key(self):
|
|
return (self.slug,)
|
|
|
|
def __str__(self):
|
|
return self.label
|
|
|
|
def get_text_on_success(self):
|
|
if self.text_on_success:
|
|
return self.text_on_success
|
|
return _('Your payment has been succesfully registered.')
|
|
|
|
def get_invoices(self, user, history=False):
|
|
if not self.is_remote():
|
|
return []
|
|
if user:
|
|
url = self.webservice_url + '/invoices/'
|
|
if history:
|
|
url += 'history/'
|
|
items = requests.get(url, user=user, remote_service='auto', cache_duration=0).json()
|
|
if items.get('data'):
|
|
return [build_remote_item(item, self) for item in items.get('data')]
|
|
return []
|
|
return []
|
|
|
|
def get_invoice(self, user, invoice_id, log_errors=True):
|
|
if not self.is_remote():
|
|
return self.basketitem_set.get(pk=invoice_id)
|
|
url = self.webservice_url + '/invoice/%s/' % invoice_id
|
|
response = requests.get(url, user=user, remote_service='auto', cache_duration=0, log_errors=log_errors)
|
|
if response.status_code == 404:
|
|
raise ObjectDoesNotExist()
|
|
response.raise_for_status()
|
|
if response.json().get('data') is None:
|
|
raise ObjectDoesNotExist()
|
|
return build_remote_item(response.json().get('data'), self)
|
|
|
|
def get_invoice_pdf(self, user, invoice_id):
|
|
"""
|
|
downloads item's file
|
|
"""
|
|
if self.is_remote() and user:
|
|
url = self.webservice_url + '/invoice/%s/pdf/' % invoice_id
|
|
return requests.get(url, user=user, remote_service='auto', cache_duration=0)
|
|
raise PermissionDenied
|
|
|
|
def pay_invoice(self, invoice_id, transaction_id, transaction_date):
|
|
url = self.webservice_url + '/invoice/%s/pay/' % invoice_id
|
|
data = {'transaction_id': transaction_id,
|
|
'transaction_date': transaction_date.strftime('%Y-%m-%dT%H:%M:%S')}
|
|
headers = {'content-type': 'application/json'}
|
|
return requests.post(url, remote_service='auto',
|
|
data=json.dumps(data), headers=headers).json()
|
|
|
|
def as_api_dict(self):
|
|
return {'id': self.slug,
|
|
'text': self.label,
|
|
'description': self.description}
|
|
|
|
def compute_extra_fees(self, user):
|
|
if not self.extra_fees_ws_url:
|
|
return
|
|
post_data = {'data': []}
|
|
basketitems = BasketItem.get_items_to_be_paid(user).filter(regie=self)
|
|
for basketitem in basketitems.filter(extra_fee=False):
|
|
basketitem_data = {
|
|
'subject': basketitem.subject,
|
|
'source_url': basketitem.source_url,
|
|
'details': basketitem.details,
|
|
'amount': str(basketitem.amount),
|
|
'request_data': basketitem.request_data
|
|
}
|
|
post_data['data'].append(basketitem_data)
|
|
if not post_data['data']:
|
|
basketitems.filter(extra_fee=True).delete()
|
|
return
|
|
response = requests.post(
|
|
self.extra_fees_ws_url,
|
|
remote_service='auto',
|
|
data=json.dumps(post_data),
|
|
headers={'content-type': 'application/json'})
|
|
if response.status_code != 200 or response.json().get('err'):
|
|
logger = logging.getLogger(__name__)
|
|
logger.error('failed to compute extra fees (user: %r)', user)
|
|
return
|
|
basketitems.filter(extra_fee=True).delete()
|
|
for extra_fee in response.json().get('data'):
|
|
BasketItem(user=user, regie=self,
|
|
subject=extra_fee.get('subject'),
|
|
amount=extra_fee.get('amount'),
|
|
extra_fee=True,
|
|
user_cancellable=False).save()
|
|
|
|
def get_remote_pending_invoices(self):
|
|
if not self.is_remote() or UserSAMLIdentifier is None:
|
|
return {}
|
|
url = self.webservice_url + '/users/with-pending-invoices/'
|
|
response = requests.get(url, remote_service='auto', cache_duration=0,
|
|
log_errors=False, without_user=True)
|
|
if not response.ok:
|
|
return {}
|
|
return response.json()['data']
|
|
|
|
def get_notification_namespace(self):
|
|
return 'invoice-%s' % self.slug
|
|
|
|
def get_notification_id(self, invoice):
|
|
return '%s:%s' % (self.get_notification_namespace(), invoice.id)
|
|
|
|
def get_notification_reminder_id(self, invoice):
|
|
return '%s:reminder-%s' % (self.get_notification_namespace(), invoice.id)
|
|
|
|
def notify_invoice(self, user, invoice):
|
|
today = timezone.now().date()
|
|
remind_delta = timezone.timedelta(days=settings.LINGO_NEW_INVOICES_REMIND_DELTA)
|
|
active_items_cell = ActiveItems.objects.first()
|
|
if active_items_cell:
|
|
items_page_url = active_items_cell.page.get_online_url()
|
|
else:
|
|
items_page_url = ''
|
|
notification_id = self.get_notification_id(invoice)
|
|
notification_reminder_id = self.get_notification_reminder_id(invoice)
|
|
if invoice.payment_limit_date < today:
|
|
# invoice is out of date
|
|
Notification.objects.find(user, notification_id).forget()
|
|
Notification.objects.find(user, notification_reminder_id).forget()
|
|
else:
|
|
# invoice can be paid
|
|
if invoice.payment_limit_date >= today + remind_delta:
|
|
message = _('Invoice %s to pay') % invoice.subject
|
|
else:
|
|
message = _('Reminder: invoice %s to pay') % invoice.subject
|
|
notification_id = notification_reminder_id
|
|
if not Notification.objects.find(user, notification_id).exists():
|
|
self.notify_remote_invoice_by_email(user, invoice)
|
|
Notification.notify(user,
|
|
summary=message,
|
|
id=notification_id,
|
|
url=items_page_url,
|
|
end_timestamp=invoice.payment_limit_date)
|
|
return notification_id
|
|
|
|
def notify_new_remote_invoices(self):
|
|
if UserSAMLIdentifier is None:
|
|
# remote invoices retrieval requires SAML
|
|
return
|
|
pending_invoices = self.get_remote_pending_invoices()
|
|
notification_ids = []
|
|
for uuid, items in pending_invoices.items():
|
|
try:
|
|
user = UserSAMLIdentifier.objects.get(name_id=uuid).user
|
|
except UserSAMLIdentifier.DoesNotExist:
|
|
continue
|
|
for invoice in items['invoices']:
|
|
remote_invoice = build_remote_item(invoice, self)
|
|
if remote_invoice.total_amount >= self.payment_min_amount:
|
|
notification_ids.append(
|
|
self.notify_invoice(user, remote_invoice))
|
|
# clear old notifications for invoice not in the source anymore
|
|
Notification.objects.namespace(self.get_notification_namespace())\
|
|
.exclude(external_id__in=notification_ids) \
|
|
.forget()
|
|
|
|
def notify_remote_invoice_by_email(self, user, invoice):
|
|
|
|
subject_template = 'lingo/combo/invoice_email_notification_subject.txt'
|
|
text_body_template = 'lingo/combo/invoice_email_notification_body.txt'
|
|
html_body_template = 'lingo/combo/invoice_email_notification_body.html'
|
|
|
|
payment_url = reverse('view-item', kwargs={'regie_id': self.id,
|
|
'item_crypto_id': invoice.crypto_id})
|
|
ctx = settings.TEMPLATE_VARS.copy()
|
|
ctx['invoice'] = invoice
|
|
ctx['payment_url'] = urlparse.urljoin(settings.SITE_BASE_URL, payment_url)
|
|
ctx['portal_url'] = settings.SITE_BASE_URL
|
|
subject = render_to_string([subject_template], ctx).strip()
|
|
text_body = render_to_string([text_body_template], ctx)
|
|
html_body = render_to_string([html_body_template], ctx)
|
|
message = EmailMultiAlternatives(subject, text_body, to=[user.email])
|
|
message.attach_alternative(html_body, 'text/html')
|
|
if invoice.has_pdf:
|
|
invoice_pdf = self.get_invoice_pdf(user, invoice.id)
|
|
message.attach('%s.pdf' % invoice.id, invoice_pdf.content, 'application/pdf')
|
|
message.send()
|
|
|
|
|
|
class BasketItem(models.Model):
|
|
user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True)
|
|
regie = models.ForeignKey(Regie)
|
|
subject = models.CharField(verbose_name=_('Subject'), max_length=200)
|
|
source_url = models.URLField(_('Source URL'), blank=True)
|
|
details = models.TextField(verbose_name=_('Details'), blank=True)
|
|
amount = models.DecimalField(verbose_name=_('Amount'),
|
|
decimal_places=2, max_digits=8)
|
|
request_data = JSONField(blank=True)
|
|
extra_fee = models.BooleanField(default=False)
|
|
user_cancellable = models.BooleanField(default=True)
|
|
creation_date = models.DateTimeField(auto_now_add=True)
|
|
cancellation_date = models.DateTimeField(null=True)
|
|
waiting_date = models.DateTimeField(null=True)
|
|
payment_date = models.DateTimeField(null=True)
|
|
notification_date = models.DateTimeField(null=True)
|
|
capture_date = models.DateField(null=True)
|
|
|
|
class Meta:
|
|
ordering = ['regie', 'extra_fee', 'subject']
|
|
|
|
@classmethod
|
|
def get_items_to_be_paid(cls, user):
|
|
return cls.objects.filter(
|
|
user=user,
|
|
payment_date__isnull=True,
|
|
waiting_date__isnull=True,
|
|
cancellation_date__isnull=True)
|
|
|
|
def notify(self, status):
|
|
if not self.source_url:
|
|
return
|
|
url = self.source_url + 'jump/trigger/%s' % status
|
|
message = {'result': 'ok'}
|
|
if status == 'paid':
|
|
transaction = self.transaction_set.filter(
|
|
status__in=(eopayment.ACCEPTED, eopayment.PAID))[0]
|
|
message['transaction_id'] = transaction.id
|
|
message['order_id'] = transaction.order_id
|
|
message['bank_transaction_id'] = transaction.bank_transaction_id
|
|
message['bank_data'] = transaction.bank_data
|
|
headers = {'content-type': 'application/json'}
|
|
r = requests.post(url, remote_service='auto',
|
|
data=json.dumps(message), headers=headers, timeout=15)
|
|
r.raise_for_status()
|
|
|
|
def notify_payment(self):
|
|
self.notify('paid')
|
|
self.notification_date = timezone.now()
|
|
self.save()
|
|
self.regie.compute_extra_fees(user=self.user)
|
|
|
|
def notify_cancellation(self, notify_origin=False):
|
|
if notify_origin:
|
|
self.notify('cancelled')
|
|
self.cancellation_date = timezone.now()
|
|
self.save()
|
|
self.regie.compute_extra_fees(user=self.user)
|
|
|
|
@property
|
|
def total_amount(self):
|
|
return self.amount
|
|
|
|
|
|
class RemoteItem(object):
|
|
payment_date = None
|
|
|
|
def __init__(self, id, regie, creation_date, payment_limit_date,
|
|
total_amount, amount, display_id, subject, has_pdf,
|
|
online_payment, paid, payment_date, no_online_payment_reason):
|
|
self.id = id
|
|
self.regie = regie
|
|
self.creation_date = dateparse.parse_date(creation_date)
|
|
self.payment_limit_date = dateparse.parse_date(payment_limit_date)
|
|
self.total_amount = Decimal(total_amount)
|
|
self.amount = Decimal(amount)
|
|
self.display_id = display_id or self.id
|
|
self.subject = subject
|
|
self.has_pdf = has_pdf
|
|
self.online_payment = online_payment
|
|
self.paid = paid
|
|
self.no_online_payment_reason = no_online_payment_reason
|
|
if payment_date:
|
|
self.payment_date = parser.parse(payment_date)
|
|
|
|
@property
|
|
def no_online_payment_reason_details(self):
|
|
reasons = {'litigation': _('This invoice is in litigation.'),
|
|
'autobilling': _('Autobilling has been set for this invoice.'),
|
|
'past-due-date': _('Due date is over.'),
|
|
}
|
|
return settings.LINGO_NO_ONLINE_PAYMENT_REASONS.get(self.no_online_payment_reason,
|
|
reasons.get(self.no_online_payment_reason))
|
|
|
|
@property
|
|
def crypto_id(self):
|
|
return aes_hex_encrypt(settings.SECRET_KEY, str(self.id))
|
|
|
|
|
|
class Transaction(models.Model):
|
|
regie = models.ForeignKey(Regie, null=True)
|
|
items = models.ManyToManyField(BasketItem, blank=True)
|
|
remote_items = models.CharField(max_length=512)
|
|
to_be_paid_remote_items = models.CharField(max_length=512, null=True)
|
|
start_date = models.DateTimeField(auto_now_add=True)
|
|
end_date = models.DateTimeField(null=True)
|
|
bank_data = JSONField(blank=True)
|
|
order_id = models.CharField(max_length=200)
|
|
bank_transaction_id = models.CharField(max_length=200, null=True)
|
|
user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True)
|
|
status = models.IntegerField(null=True)
|
|
amount = models.DecimalField(default=0, max_digits=7, decimal_places=2)
|
|
|
|
def is_remote(self):
|
|
return self.remote_items != ''
|
|
|
|
def get_user_name(self):
|
|
if self.user:
|
|
return self.user.get_full_name()
|
|
return _('Anonymous User')
|
|
|
|
def is_paid(self):
|
|
return self.status in (eopayment.PAID, eopayment.ACCEPTED)
|
|
|
|
def get_status_label(self):
|
|
return {
|
|
0: _('Running'),
|
|
eopayment.PAID: _('Paid'),
|
|
eopayment.ACCEPTED: _('Paid (accepted)'),
|
|
eopayment.CANCELLED: _('Cancelled'),
|
|
EXPIRED: _('Expired')
|
|
}.get(self.status) or _('Unknown')
|
|
|
|
def first_notify_remote_items_of_payments(self):
|
|
self.notify_remote_items_of_payments(self.remote_items)
|
|
|
|
def retry_notify_remote_items_of_payments(self):
|
|
self.notify_remote_items_of_payments(self.to_be_paid_remote_items)
|
|
|
|
def notify_remote_items_of_payments(self, items):
|
|
logger = logging.getLogger(__name__)
|
|
if not items:
|
|
return
|
|
if not self.is_paid():
|
|
return
|
|
|
|
regie = self.regie
|
|
to_be_paid_remote_items = []
|
|
for item_id in items.split(','):
|
|
try:
|
|
remote_item = regie.get_invoice(user=self.user, invoice_id=item_id)
|
|
regie.pay_invoice(item_id, self.order_id, self.end_date)
|
|
except Exception:
|
|
to_be_paid_remote_items.append(item_id)
|
|
logger.exception(u'unable to notify payment for remote item %s from transaction %s',
|
|
item_id, self)
|
|
else:
|
|
logger.info(u'notified payment for remote item %s from transaction %s',
|
|
item_id, self)
|
|
subject = _('Invoice #%s') % remote_item.display_id
|
|
local_item = BasketItem.objects.create(user=self.user,
|
|
regie=regie,
|
|
source_url='',
|
|
subject=subject,
|
|
amount=remote_item.amount,
|
|
payment_date=self.end_date)
|
|
self.items.add(local_item)
|
|
|
|
self.to_be_paid_remote_items = ','.join(to_be_paid_remote_items) or None
|
|
self.save(update_fields=['to_be_paid_remote_items'])
|
|
|
|
|
|
class TransactionOperation(models.Model):
|
|
OPERATIONS = [
|
|
('validation', _('Validation')),
|
|
('cancellation', _('Cancellation')),
|
|
]
|
|
transaction = models.ForeignKey(Transaction)
|
|
kind = models.CharField(max_length=65, choices=OPERATIONS)
|
|
amount = models.DecimalField(decimal_places=2, max_digits=8)
|
|
creation_date = models.DateTimeField(auto_now_add=True)
|
|
bank_result = JSONField(blank=True)
|
|
|
|
|
|
@register_cell_class
|
|
class LingoBasketCell(CellBase):
|
|
user_dependant = True
|
|
|
|
class Meta:
|
|
verbose_name = _('Basket')
|
|
|
|
class Media:
|
|
js = ('xstatic/jquery-ui.min.js', 'js/gadjo.js',)
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
return Regie.objects.count() > 0
|
|
|
|
def is_relevant(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
|
|
return False
|
|
return BasketItem.get_items_to_be_paid(context['request'].user).count() > 0
|
|
|
|
def get_badge(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
|
|
return
|
|
items = BasketItem.get_items_to_be_paid(context['request'].user)
|
|
if not items:
|
|
return
|
|
total = sum([x.amount for x in items])
|
|
if total == int(total):
|
|
total = int(total)
|
|
return {'badge': _(u'%s€') % localize(total)}
|
|
|
|
def render(self, context):
|
|
basket_template = template.loader.get_template('lingo/combo/basket.html')
|
|
items = BasketItem.get_items_to_be_paid(context['request'].user)
|
|
regies = {}
|
|
for item in items:
|
|
if not item.regie_id in regies:
|
|
regies[item.regie_id] = {'items': [], 'regie': item.regie}
|
|
regies[item.regie_id]['items'].append(item)
|
|
|
|
for items in regies.values():
|
|
items['total'] = sum([x.amount for x in items['items']])
|
|
|
|
context['regies'] = sorted(regies.values(), key=lambda x: x['regie'].label)
|
|
return basket_template.render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class LingoRecentTransactionsCell(CellBase):
|
|
user_dependant = True
|
|
|
|
class Meta:
|
|
verbose_name = _('Recent Transactions')
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
return Regie.objects.count() > 0
|
|
|
|
def is_relevant(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
|
|
return False
|
|
transactions = Transaction.objects.filter(
|
|
user=context['request'].user,
|
|
start_date__gte=timezone.now()-datetime.timedelta(days=7))
|
|
return len(transactions) > 0
|
|
|
|
def render(self, context):
|
|
recent_transactions_template = template.loader.get_template(
|
|
'lingo/combo/recent_transactions.html')
|
|
context['transactions'] = Transaction.objects.filter(
|
|
user=context['request'].user,
|
|
start_date__gte=timezone.now()-datetime.timedelta(days=7)
|
|
).order_by('-start_date')
|
|
return recent_transactions_template.render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class LingoBasketLinkCell(CellBase):
|
|
user_dependant = True
|
|
|
|
class Meta:
|
|
verbose_name = _('Basket Link')
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
return Regie.objects.count() > 0
|
|
|
|
def is_relevant(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
|
|
return False
|
|
return BasketItem.get_items_to_be_paid(context['request'].user).count() > 0
|
|
|
|
def render(self, context):
|
|
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
|
|
return ''
|
|
try:
|
|
context['basket_url'] = LingoBasketCell.objects.all()[0].page.get_online_url()
|
|
except IndexError:
|
|
return ''
|
|
basket_template = template.loader.get_template('lingo/combo/basket_link.html')
|
|
context['items'] = BasketItem.get_items_to_be_paid(user=context['request'].user)
|
|
context['total'] = sum([x.amount for x in context['items']])
|
|
return basket_template.render(context)
|
|
|
|
|
|
class Items(CellBase):
|
|
regie = models.CharField(_('Regie'), max_length=50, blank=True)
|
|
title = models.CharField(_('Title'), max_length=200, blank=True)
|
|
text = RichTextField(_('Text'), blank=True, null=True)
|
|
|
|
user_dependant = True
|
|
template_name = 'lingo/combo/items.html'
|
|
loading_message = _('Loading invoices...')
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
|
|
class Media:
|
|
js = ('xstatic/jquery-ui.min.js', 'js/gadjo.js',)
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
return Regie.objects.exclude(webservice_url='').count() > 0
|
|
|
|
def is_relevant(self, context):
|
|
return (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated())
|
|
|
|
def get_default_form_class(self):
|
|
fields = ['title', 'text']
|
|
widgets = {}
|
|
if Regie.objects.exclude(webservice_url='').count() > 1:
|
|
regies = [('', _('All'))]
|
|
regies.extend([(r.slug, r.label) for r in Regie.objects.exclude(webservice_url='')])
|
|
widgets['regie'] = Select(choices=regies)
|
|
fields.insert(0, 'regie')
|
|
return model_forms.modelform_factory(self.__class__, fields=fields, widgets=widgets)
|
|
|
|
def get_regies(self):
|
|
if self.regie:
|
|
return [Regie.objects.get(slug=self.regie)]
|
|
return Regie.objects.all()
|
|
|
|
def get_invoices(self, user):
|
|
return []
|
|
|
|
def get_cell_extra_context(self, context):
|
|
ctx = super(Items, self).get_cell_extra_context(context)
|
|
if context.get('placeholder_search_mode'):
|
|
# don't call webservices when we're just looking for placeholders
|
|
return ctx
|
|
ctx.update({'title': self.title, 'text': self.text})
|
|
items = self.get_invoices(user=context['user'])
|
|
items.sort(key=lambda i: i.creation_date, reverse=True)
|
|
ctx.update({'items': items})
|
|
return ctx
|
|
|
|
def render(self, context):
|
|
self.context = context
|
|
if not context.get('synchronous'):
|
|
raise NothingInCacheException()
|
|
return super(Items, self).render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class ItemsHistory(Items):
|
|
|
|
class Meta:
|
|
verbose_name = _('Items History Cell')
|
|
|
|
def get_invoices(self, user):
|
|
items = []
|
|
for r in self.get_regies():
|
|
items.extend(r.get_invoices(user, history=True))
|
|
return items
|
|
|
|
|
|
@register_cell_class
|
|
class ActiveItems(Items):
|
|
|
|
class Meta:
|
|
verbose_name = _('Active Items Cell')
|
|
|
|
def get_invoices(self, user):
|
|
items = []
|
|
for r in self.get_regies():
|
|
items.extend(r.get_invoices(user))
|
|
return items
|
|
|
|
|
|
@register_cell_class
|
|
class SelfDeclaredInvoicePayment(Items):
|
|
user_dependant = False
|
|
template_name = 'lingo/combo/self-declared-invoice-payment.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Self declared invoice payment')
|
|
|
|
def is_relevant(self, context):
|
|
return self.is_enabled()
|
|
|
|
def render(self, context):
|
|
context['synchronous'] = True
|
|
context['page_path'] = context['request'].path
|
|
return super(SelfDeclaredInvoicePayment, self).render(context)
|
|
|
|
|
|
TIPI_CONTROL_PROCOTOLS = (
|
|
('pesv2', _('Indigo/PES v2')),
|
|
('rolmre', _('ROLMRE')),
|
|
)
|
|
|
|
@register_cell_class
|
|
class TipiPaymentFormCell(CellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
url = models.URLField(_('TIPI payment service URL'), default='https://www.tipi.budget.gouv.fr/tpa/paiement.web')
|
|
regies = models.CharField(_('Regies'), help_text=_('separated by commas'), max_length=256)
|
|
control_protocol = models.CharField(_('Control protocol'), max_length=8, choices=TIPI_CONTROL_PROCOTOLS, default='pesv2')
|
|
exer = models.CharField('Exer', max_length=4, blank=True, help_text=_('Default value to be used in form'))
|
|
idpce = models.CharField('IDPCE', max_length=8, blank=True, help_text=_('Default value to be used in form'))
|
|
idligne = models.CharField('IDLIGNE', max_length=6, blank=True, help_text=_('Default value to be used in form'))
|
|
rolrec = models.CharField('ROLREC', max_length=2, blank=True, help_text=_('Default value to be used in form'))
|
|
roldeb = models.CharField('ROLDEB', max_length=2, blank=True, help_text=_('Default value to be used in form'))
|
|
roldet = models.CharField('ROLDET', max_length=13, blank=True, help_text=_('Default value to be used in form'))
|
|
test_mode = models.BooleanField(_('Test mode'), default=False)
|
|
template_name = 'lingo/tipi_form.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('TIPI Payment Form')
|
|
|
|
class Media:
|
|
js = ('js/tipi.js',)
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super(TipiPaymentFormCell, self).get_cell_extra_context(context)
|
|
form_fields = self.get_default_form_class().base_fields
|
|
field_definitions = ({'protocol': 'any', 'fields': ['exer']},
|
|
{'protocol': 'pesv2', 'fields': ['idligne', 'idpce']},
|
|
{'protocol': 'rolmre', 'fields': ['rolrec', 'roldeb', 'roldet']}
|
|
)
|
|
reference_fields = []
|
|
for definition in field_definitions:
|
|
for field in definition['fields']:
|
|
field_pattern = '[0-9]+'
|
|
# special pattern for rolrec
|
|
if field == 'rolrec':
|
|
field_pattern = '[A-Z0-9]+'
|
|
reference_fields.append({'name': field, 'length': form_fields[field].max_length,
|
|
'placeholder': '0'*form_fields[field].max_length,
|
|
'pattern': field_pattern,
|
|
'protocol': definition['protocol']})
|
|
context['title'] = self.title
|
|
context['url'] = self.url
|
|
context['mode'] = 'T' if self.test_mode else 'M'
|
|
context['control_protocol'] = self.control_protocol
|
|
context['regies'] = []
|
|
for field in reference_fields:
|
|
if getattr(self, field['name']):
|
|
field['default'] = getattr(self, field['name'])
|
|
context['reference_fields'] = reference_fields
|
|
for regie in self.regies.split(','):
|
|
regie_id = regie.strip()
|
|
if not regie_id:
|
|
continue
|
|
context['regies'].append(regie_id)
|
|
return extra_context
|