combo/combo/apps/lingo/models.py

778 lines
31 KiB
Python

# -*- coding: utf-8 -*-
#
# lingo - basket and payment system
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import logging
from decimal import Decimal
from dateutil import parser
import eopayment
from jsonfield import JSONField
from django import template
from django.conf import settings
from django.db import models
from django.forms import models as model_forms, Select
from django.utils.translation import ugettext_lazy as _
from django.utils import timezone, dateparse
from django.core.mail import EmailMultiAlternatives
from django.core.urlresolvers import reverse
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
from django.utils.encoding import python_2_unicode_compatible
from django.utils.formats import localize
from django.utils.http import urlencode
from django.utils.six.moves.urllib import parse as urlparse
from django.contrib.auth.models import User
from django.template.loader import render_to_string
from combo.data.fields import RichTextField
from combo.data.models import CellBase
from combo.data.library import register_cell_class
from combo.utils import NothingInCacheException, aes_hex_encrypt, requests
from combo.apps.notifications.models import Notification
try:
from mellon.models import UserSAMLIdentifier
except ImportError:
UserSAMLIdentifier = None
EXPIRED = 9999
SERVICES = [
(eopayment.DUMMY, _('Dummy (for tests)')),
(eopayment.SYSTEMPAY, 'systempay (Banque Populaire)'),
(eopayment.SIPS, _('SIPS (Atos, France)')),
(eopayment.SIPS2, _('SIPS (Atos, other countries)')),
(eopayment.SPPLUS, _('SP+ (Caisse d\'epargne)')),
(eopayment.OGONE, _('Ingenico (formerly Ogone)')),
(eopayment.PAYBOX, _('Paybox')),
(eopayment.PAYZEN, _('PayZen')),
(eopayment.TIPI, _('TIPI')),
]
def build_remote_item(data, regie):
return RemoteItem(id=data.get('id'), regie=regie,
creation_date=data['created'],
payment_limit_date=data['pay_limit_date'],
display_id=data.get('display_id'),
total_amount=data.get('total_amount'),
amount=data.get('amount'),
subject=data.get('label'),
has_pdf=data.get('has_pdf'),
online_payment=data.get('online_payment'),
paid=data.get('paid'),
payment_date=data.get('payment_date'),
no_online_payment_reason=data.get('no_online_payment_reason'))
@python_2_unicode_compatible
class PaymentBackend(models.Model):
label = models.CharField(verbose_name=_('Label'), max_length=64)
slug = models.SlugField(
unique=True, verbose_name=_('Identifier'),
help_text=_('The identifier is used in webservice calls.'))
service = models.CharField(
verbose_name=_('Payment Service'), max_length=64, choices=SERVICES)
service_options = JSONField(blank=True, verbose_name=_('Payment Service Options'))
def __str__(self):
return self.label
def get_payment(self):
return eopayment.Payment(self.service, self.service_options)
@python_2_unicode_compatible
class Regie(models.Model):
label = models.CharField(verbose_name=_('Label'), max_length=64)
slug = models.SlugField(unique=True, verbose_name=_('Identifier'),
help_text=_('The identifier is used in webservice calls.'))
description = models.TextField(verbose_name=_('Description'))
is_default = models.BooleanField(verbose_name=_('Default Regie'), default=False)
webservice_url = models.URLField(_('Webservice URL to retrieve remote items'),
blank=True)
extra_fees_ws_url = models.URLField(_('Webservice URL to compute extra fees'),
blank=True)
payment_min_amount = models.DecimalField(_('Minimal payment amount'),
max_digits=7, decimal_places=2, default=0)
text_on_success = models.TextField(
verbose_name=_('Custom text displayed on success'),
blank=True, null=True)
payment_backend = models.ForeignKey(PaymentBackend, on_delete=models.CASCADE)
transaction_options = JSONField(blank=True, verbose_name=_('Transaction Options'))
def is_remote(self):
return self.webservice_url != ''
class Meta:
verbose_name = _('Regie')
ordering = ('-is_default', 'label',)
def save(self, *args, **kwargs):
if self.webservice_url and self.webservice_url.endswith('/'):
self.webservice_url = self.webservice_url.strip('/')
if self.is_default:
qs = self.__class__.objects.filter(is_default=True)
if self.pk:
qs = qs.exclude(pk=self.pk)
qs.update(is_default=False)
elif self.__class__.objects.filter(is_default=True).count() == 0:
self.is_default = True
super(Regie, self).save(*args, **kwargs)
def natural_key(self):
return (self.slug,)
def __str__(self):
return self.label
def get_text_on_success(self):
if self.text_on_success:
return self.text_on_success
return _('Your payment has been succesfully registered.')
def get_invoices(self, user, history=False):
if not self.is_remote():
return []
if user:
url = self.webservice_url + '/invoices/'
if history:
url += 'history/'
items = requests.get(url, user=user, remote_service='auto', cache_duration=0).json()
if items.get('data'):
return [build_remote_item(item, self) for item in items.get('data')]
return []
return []
def get_invoice(self, user, invoice_id, log_errors=True):
if not self.is_remote():
return self.basketitem_set.get(pk=invoice_id)
url = self.webservice_url + '/invoice/%s/' % invoice_id
response = requests.get(url, user=user, remote_service='auto', cache_duration=0, log_errors=log_errors)
if response.status_code == 404:
raise ObjectDoesNotExist()
response.raise_for_status()
if response.json().get('data') is None:
raise ObjectDoesNotExist()
return build_remote_item(response.json().get('data'), self)
def get_invoice_pdf(self, user, invoice_id):
"""
downloads item's file
"""
if self.is_remote() and user:
url = self.webservice_url + '/invoice/%s/pdf/' % invoice_id
return requests.get(url, user=user, remote_service='auto', cache_duration=0)
raise PermissionDenied
def pay_invoice(self, invoice_id, transaction_id, transaction_date):
url = self.webservice_url + '/invoice/%s/pay/' % invoice_id
data = {'transaction_id': transaction_id,
'transaction_date': transaction_date.strftime('%Y-%m-%dT%H:%M:%S')}
headers = {'content-type': 'application/json'}
return requests.post(url, remote_service='auto',
data=json.dumps(data), headers=headers).json()
def as_api_dict(self):
return {'id': self.slug,
'text': self.label,
'description': self.description}
def compute_extra_fees(self, user):
if not self.extra_fees_ws_url:
return
post_data = {'data': []}
basketitems = BasketItem.get_items_to_be_paid(user).filter(regie=self)
for basketitem in basketitems.filter(extra_fee=False):
basketitem_data = {
'subject': basketitem.subject,
'source_url': basketitem.source_url,
'details': basketitem.details,
'amount': str(basketitem.amount),
'request_data': basketitem.request_data
}
post_data['data'].append(basketitem_data)
if not post_data['data']:
basketitems.filter(extra_fee=True).delete()
return
response = requests.post(
self.extra_fees_ws_url,
remote_service='auto',
data=json.dumps(post_data),
headers={'content-type': 'application/json'})
if response.status_code != 200 or response.json().get('err'):
logger = logging.getLogger(__name__)
logger.error('failed to compute extra fees (user: %r)', user)
return
basketitems.filter(extra_fee=True).delete()
for extra_fee in response.json().get('data'):
BasketItem(user=user, regie=self,
subject=extra_fee.get('subject'),
amount=extra_fee.get('amount'),
extra_fee=True,
user_cancellable=False).save()
def get_remote_pending_invoices(self):
if not self.is_remote() or UserSAMLIdentifier is None:
return {}
url = self.webservice_url + '/users/with-pending-invoices/'
response = requests.get(url, remote_service='auto', cache_duration=0,
log_errors=False, without_user=True)
if not response.ok:
return {}
return response.json()['data']
def get_notification_namespace(self):
return 'invoice-%s' % self.slug
def get_notification_id(self, invoice):
return '%s:%s' % (self.get_notification_namespace(), invoice.id)
def get_notification_reminder_id(self, invoice):
return '%s:reminder-%s' % (self.get_notification_namespace(), invoice.id)
def notify_invoice(self, user, invoice):
today = timezone.now().date()
remind_delta = timezone.timedelta(days=settings.LINGO_NEW_INVOICES_REMIND_DELTA)
active_items_cell = ActiveItems.objects.first()
if active_items_cell:
items_page_url = active_items_cell.page.get_online_url()
else:
items_page_url = ''
notification_id = self.get_notification_id(invoice)
notification_reminder_id = self.get_notification_reminder_id(invoice)
if invoice.payment_limit_date < today:
# invoice is out of date
Notification.objects.find(user, notification_id).forget()
Notification.objects.find(user, notification_reminder_id).forget()
else:
# invoice can be paid
if invoice.payment_limit_date >= today + remind_delta:
message = _('Invoice %s to pay') % invoice.subject
else:
message = _('Reminder: invoice %s to pay') % invoice.subject
notification_id = notification_reminder_id
if not Notification.objects.find(user, notification_id).exists():
self.notify_remote_invoice_by_email(user, invoice)
Notification.notify(user,
summary=message,
id=notification_id,
url=items_page_url,
end_timestamp=invoice.payment_limit_date)
return notification_id
def notify_new_remote_invoices(self):
if UserSAMLIdentifier is None:
# remote invoices retrieval requires SAML
return
pending_invoices = self.get_remote_pending_invoices()
notification_ids = []
for uuid, items in pending_invoices.items():
try:
user = UserSAMLIdentifier.objects.get(name_id=uuid).user
except UserSAMLIdentifier.DoesNotExist:
continue
for invoice in items['invoices']:
remote_invoice = build_remote_item(invoice, self)
if remote_invoice.total_amount >= self.payment_min_amount:
notification_ids.append(
self.notify_invoice(user, remote_invoice))
# clear old notifications for invoice not in the source anymore
Notification.objects.namespace(self.get_notification_namespace())\
.exclude(external_id__in=notification_ids) \
.forget()
def notify_remote_invoice_by_email(self, user, invoice):
subject_template = 'lingo/combo/invoice_email_notification_subject.txt'
text_body_template = 'lingo/combo/invoice_email_notification_body.txt'
html_body_template = 'lingo/combo/invoice_email_notification_body.html'
payment_url = reverse('view-item', kwargs={'regie_id': self.id,
'item_crypto_id': invoice.crypto_id})
ctx = settings.TEMPLATE_VARS.copy()
ctx['invoice'] = invoice
ctx['payment_url'] = urlparse.urljoin(settings.SITE_BASE_URL, payment_url)
ctx['portal_url'] = settings.SITE_BASE_URL
subject = render_to_string([subject_template], ctx).strip()
text_body = render_to_string([text_body_template], ctx)
html_body = render_to_string([html_body_template], ctx)
message = EmailMultiAlternatives(subject, text_body, to=[user.email])
message.attach_alternative(html_body, 'text/html')
if invoice.has_pdf:
invoice_pdf = self.get_invoice_pdf(user, invoice.id)
message.attach('%s.pdf' % invoice.id, invoice_pdf.content, 'application/pdf')
message.send()
class BasketItem(models.Model):
user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True)
regie = models.ForeignKey(Regie)
subject = models.CharField(verbose_name=_('Subject'), max_length=200)
source_url = models.URLField(_('Source URL'), blank=True)
details = models.TextField(verbose_name=_('Details'), blank=True)
amount = models.DecimalField(verbose_name=_('Amount'),
decimal_places=2, max_digits=8)
request_data = JSONField(blank=True)
extra_fee = models.BooleanField(default=False)
user_cancellable = models.BooleanField(default=True)
creation_date = models.DateTimeField(auto_now_add=True)
cancellation_date = models.DateTimeField(null=True)
waiting_date = models.DateTimeField(null=True)
payment_date = models.DateTimeField(null=True)
notification_date = models.DateTimeField(null=True)
capture_date = models.DateField(null=True)
class Meta:
ordering = ['regie', 'extra_fee', 'subject']
@classmethod
def get_items_to_be_paid(cls, user):
return cls.objects.filter(
user=user,
payment_date__isnull=True,
waiting_date__isnull=True,
cancellation_date__isnull=True)
def notify(self, status):
if not self.source_url:
return
url = self.source_url + 'jump/trigger/%s' % status
message = {'result': 'ok'}
if status == 'paid':
transaction = self.transaction_set.filter(
status__in=(eopayment.ACCEPTED, eopayment.PAID))[0]
message['transaction_id'] = transaction.id
message['order_id'] = transaction.order_id
message['bank_transaction_id'] = transaction.bank_transaction_id
message['bank_data'] = transaction.bank_data
headers = {'content-type': 'application/json'}
r = requests.post(url, remote_service='auto',
data=json.dumps(message), headers=headers, timeout=15)
r.raise_for_status()
def notify_payment(self):
self.notify('paid')
self.notification_date = timezone.now()
self.save()
self.regie.compute_extra_fees(user=self.user)
def notify_cancellation(self, notify_origin=False):
if notify_origin:
self.notify('cancelled')
self.cancellation_date = timezone.now()
self.save()
self.regie.compute_extra_fees(user=self.user)
@property
def total_amount(self):
return self.amount
class RemoteItem(object):
payment_date = None
def __init__(self, id, regie, creation_date, payment_limit_date,
total_amount, amount, display_id, subject, has_pdf,
online_payment, paid, payment_date, no_online_payment_reason):
self.id = id
self.regie = regie
self.creation_date = dateparse.parse_date(creation_date)
self.payment_limit_date = dateparse.parse_date(payment_limit_date)
self.total_amount = Decimal(total_amount)
self.amount = Decimal(amount)
self.display_id = display_id or self.id
self.subject = subject
self.has_pdf = has_pdf
self.online_payment = online_payment
self.paid = paid
self.no_online_payment_reason = no_online_payment_reason
if payment_date:
self.payment_date = parser.parse(payment_date)
@property
def no_online_payment_reason_details(self):
reasons = {'litigation': _('This invoice is in litigation.'),
'autobilling': _('Autobilling has been set for this invoice.'),
'past-due-date': _('Due date is over.'),
}
return settings.LINGO_NO_ONLINE_PAYMENT_REASONS.get(self.no_online_payment_reason,
reasons.get(self.no_online_payment_reason))
@property
def crypto_id(self):
return aes_hex_encrypt(settings.SECRET_KEY, str(self.id))
class Transaction(models.Model):
regie = models.ForeignKey(Regie, null=True)
items = models.ManyToManyField(BasketItem, blank=True)
remote_items = models.CharField(max_length=512)
to_be_paid_remote_items = models.CharField(max_length=512, null=True)
start_date = models.DateTimeField(auto_now_add=True)
end_date = models.DateTimeField(null=True)
bank_data = JSONField(blank=True)
order_id = models.CharField(max_length=200)
bank_transaction_id = models.CharField(max_length=200, null=True)
user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True)
status = models.IntegerField(null=True)
amount = models.DecimalField(default=0, max_digits=7, decimal_places=2)
def is_remote(self):
return self.remote_items != ''
def get_user_name(self):
if self.user:
return self.user.get_full_name()
return _('Anonymous User')
def is_paid(self):
return self.status in (eopayment.PAID, eopayment.ACCEPTED)
def get_status_label(self):
return {
0: _('Running'),
eopayment.PAID: _('Paid'),
eopayment.ACCEPTED: _('Paid (accepted)'),
eopayment.CANCELLED: _('Cancelled'),
EXPIRED: _('Expired')
}.get(self.status) or _('Unknown')
def first_notify_remote_items_of_payments(self):
self.notify_remote_items_of_payments(self.remote_items)
def retry_notify_remote_items_of_payments(self):
self.notify_remote_items_of_payments(self.to_be_paid_remote_items)
def notify_remote_items_of_payments(self, items):
logger = logging.getLogger(__name__)
if not items:
return
if not self.is_paid():
return
regie = self.regie
to_be_paid_remote_items = []
for item_id in items.split(','):
try:
remote_item = regie.get_invoice(user=self.user, invoice_id=item_id)
regie.pay_invoice(item_id, self.order_id, self.end_date)
except Exception:
to_be_paid_remote_items.append(item_id)
logger.exception(u'unable to notify payment for remote item %s from transaction %s',
item_id, self)
else:
logger.info(u'notified payment for remote item %s from transaction %s',
item_id, self)
subject = _('Invoice #%s') % remote_item.display_id
local_item = BasketItem.objects.create(user=self.user,
regie=regie,
source_url='',
subject=subject,
amount=remote_item.amount,
payment_date=self.end_date)
self.items.add(local_item)
self.to_be_paid_remote_items = ','.join(to_be_paid_remote_items) or None
self.save(update_fields=['to_be_paid_remote_items'])
class TransactionOperation(models.Model):
OPERATIONS = [
('validation', _('Validation')),
('cancellation', _('Cancellation')),
]
transaction = models.ForeignKey(Transaction)
kind = models.CharField(max_length=65, choices=OPERATIONS)
amount = models.DecimalField(decimal_places=2, max_digits=8)
creation_date = models.DateTimeField(auto_now_add=True)
bank_result = JSONField(blank=True)
@register_cell_class
class LingoBasketCell(CellBase):
user_dependant = True
class Meta:
verbose_name = _('Basket')
class Media:
js = ('xstatic/jquery-ui.min.js', 'js/gadjo.js',)
@classmethod
def is_enabled(cls):
return Regie.objects.count() > 0
def is_relevant(self, context):
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
return False
return BasketItem.get_items_to_be_paid(context['request'].user).count() > 0
def get_badge(self, context):
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
return
items = BasketItem.get_items_to_be_paid(context['request'].user)
if not items:
return
total = sum([x.amount for x in items])
if total == int(total):
total = int(total)
return {'badge': _(u'%s') % localize(total)}
def render(self, context):
basket_template = template.loader.get_template('lingo/combo/basket.html')
items = BasketItem.get_items_to_be_paid(context['request'].user)
regies = {}
for item in items:
if not item.regie_id in regies:
regies[item.regie_id] = {'items': [], 'regie': item.regie}
regies[item.regie_id]['items'].append(item)
for items in regies.values():
items['total'] = sum([x.amount for x in items['items']])
context['regies'] = sorted(regies.values(), key=lambda x: x['regie'].label)
return basket_template.render(context)
@register_cell_class
class LingoRecentTransactionsCell(CellBase):
user_dependant = True
class Meta:
verbose_name = _('Recent Transactions')
@classmethod
def is_enabled(cls):
return Regie.objects.count() > 0
def is_relevant(self, context):
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
return False
transactions = Transaction.objects.filter(
user=context['request'].user,
start_date__gte=timezone.now()-datetime.timedelta(days=7))
return len(transactions) > 0
def render(self, context):
recent_transactions_template = template.loader.get_template(
'lingo/combo/recent_transactions.html')
context['transactions'] = Transaction.objects.filter(
user=context['request'].user,
start_date__gte=timezone.now()-datetime.timedelta(days=7)
).order_by('-start_date')
return recent_transactions_template.render(context)
@register_cell_class
class LingoBasketLinkCell(CellBase):
user_dependant = True
class Meta:
verbose_name = _('Basket Link')
@classmethod
def is_enabled(cls):
return Regie.objects.count() > 0
def is_relevant(self, context):
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
return False
return BasketItem.get_items_to_be_paid(context['request'].user).count() > 0
def render(self, context):
if not (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated()):
return ''
try:
context['basket_url'] = LingoBasketCell.objects.all()[0].page.get_online_url()
except IndexError:
return ''
basket_template = template.loader.get_template('lingo/combo/basket_link.html')
context['items'] = BasketItem.get_items_to_be_paid(user=context['request'].user)
context['total'] = sum([x.amount for x in context['items']])
return basket_template.render(context)
class Items(CellBase):
regie = models.CharField(_('Regie'), max_length=50, blank=True)
title = models.CharField(_('Title'), max_length=200, blank=True)
text = RichTextField(_('Text'), blank=True, null=True)
user_dependant = True
template_name = 'lingo/combo/items.html'
loading_message = _('Loading invoices...')
class Meta:
abstract = True
class Media:
js = ('xstatic/jquery-ui.min.js', 'js/gadjo.js',)
@classmethod
def is_enabled(cls):
return Regie.objects.exclude(webservice_url='').count() > 0
def is_relevant(self, context):
return (getattr(context['request'], 'user', None) and context['request'].user.is_authenticated())
def get_default_form_class(self):
fields = ['title', 'text']
widgets = {}
if Regie.objects.exclude(webservice_url='').count() > 1:
regies = [('', _('All'))]
regies.extend([(r.slug, r.label) for r in Regie.objects.exclude(webservice_url='')])
widgets['regie'] = Select(choices=regies)
fields.insert(0, 'regie')
return model_forms.modelform_factory(self.__class__, fields=fields, widgets=widgets)
def get_regies(self):
if self.regie:
return [Regie.objects.get(slug=self.regie)]
return Regie.objects.all()
def get_invoices(self, user):
return []
def get_cell_extra_context(self, context):
ctx = super(Items, self).get_cell_extra_context(context)
if context.get('placeholder_search_mode'):
# don't call webservices when we're just looking for placeholders
return ctx
ctx.update({'title': self.title, 'text': self.text})
items = self.get_invoices(user=context['user'])
items.sort(key=lambda i: i.creation_date, reverse=True)
ctx.update({'items': items})
return ctx
def render(self, context):
self.context = context
if not context.get('synchronous'):
raise NothingInCacheException()
return super(Items, self).render(context)
@register_cell_class
class ItemsHistory(Items):
class Meta:
verbose_name = _('Items History Cell')
def get_invoices(self, user):
items = []
for r in self.get_regies():
items.extend(r.get_invoices(user, history=True))
return items
@register_cell_class
class ActiveItems(Items):
class Meta:
verbose_name = _('Active Items Cell')
def get_invoices(self, user):
items = []
for r in self.get_regies():
items.extend(r.get_invoices(user))
return items
@register_cell_class
class SelfDeclaredInvoicePayment(Items):
user_dependant = False
template_name = 'lingo/combo/self-declared-invoice-payment.html'
class Meta:
verbose_name = _('Self declared invoice payment')
def is_relevant(self, context):
return self.is_enabled()
def render(self, context):
context['synchronous'] = True
context['page_path'] = context['request'].path
return super(SelfDeclaredInvoicePayment, self).render(context)
TIPI_CONTROL_PROCOTOLS = (
('pesv2', _('Indigo/PES v2')),
('rolmre', _('ROLMRE')),
)
@register_cell_class
class TipiPaymentFormCell(CellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
url = models.URLField(_('TIPI payment service URL'), default='https://www.tipi.budget.gouv.fr/tpa/paiement.web')
regies = models.CharField(_('Regies'), help_text=_('separated by commas'), max_length=256)
control_protocol = models.CharField(_('Control protocol'), max_length=8, choices=TIPI_CONTROL_PROCOTOLS, default='pesv2')
exer = models.CharField('Exer', max_length=4, blank=True, help_text=_('Default value to be used in form'))
idpce = models.CharField('IDPCE', max_length=8, blank=True, help_text=_('Default value to be used in form'))
idligne = models.CharField('IDLIGNE', max_length=6, blank=True, help_text=_('Default value to be used in form'))
rolrec = models.CharField('ROLREC', max_length=2, blank=True, help_text=_('Default value to be used in form'))
roldeb = models.CharField('ROLDEB', max_length=2, blank=True, help_text=_('Default value to be used in form'))
roldet = models.CharField('ROLDET', max_length=13, blank=True, help_text=_('Default value to be used in form'))
test_mode = models.BooleanField(_('Test mode'), default=False)
template_name = 'lingo/tipi_form.html'
class Meta:
verbose_name = _('TIPI Payment Form')
class Media:
js = ('js/tipi.js',)
def get_cell_extra_context(self, context):
extra_context = super(TipiPaymentFormCell, self).get_cell_extra_context(context)
form_fields = self.get_default_form_class().base_fields
field_definitions = ({'protocol': 'any', 'fields': ['exer']},
{'protocol': 'pesv2', 'fields': ['idligne', 'idpce']},
{'protocol': 'rolmre', 'fields': ['rolrec', 'roldeb', 'roldet']}
)
reference_fields = []
for definition in field_definitions:
for field in definition['fields']:
field_pattern = '[0-9]+'
# special pattern for rolrec
if field == 'rolrec':
field_pattern = '[A-Z0-9]+'
reference_fields.append({'name': field, 'length': form_fields[field].max_length,
'placeholder': '0'*form_fields[field].max_length,
'pattern': field_pattern,
'protocol': definition['protocol']})
context['title'] = self.title
context['url'] = self.url
context['mode'] = 'T' if self.test_mode else 'M'
context['control_protocol'] = self.control_protocol
context['regies'] = []
for field in reference_fields:
if getattr(self, field['name']):
field['default'] = getattr(self, field['name'])
context['reference_fields'] = reference_fields
for regie in self.regies.split(','):
regie_id = regie.strip()
if not regie_id:
continue
context['regies'].append(regie_id)
return extra_context