remove obsolete payment stuff (#64972)

This commit is contained in:
Frédéric Péters 2022-05-06 22:51:38 +02:00
parent ac8ad0ec03
commit 7423a58599
8 changed files with 19 additions and 1393 deletions

View File

@ -7,7 +7,6 @@ from wcs.qommon.misc import get_cfg
from modules import admin
from modules import backoffice
from modules import categories_admin
from modules import payments_ui
from modules import formpage
from modules import template
from modules import root
@ -21,9 +20,6 @@ rdb = get_publisher_class().backoffice_directory_class
rdb.items = []
rdb.register_directory('payments', payments_ui.PaymentsDirectory())
rdb.register_menu_item('payments/', _('Payments'))
rdb.register_directory('settings', admin.SettingsDirectory())
import wcs.admin.forms

View File

@ -18,25 +18,6 @@ from wcs.qommon.form import *
CURRENT_USER = object()
def check_visibility(target, user=CURRENT_USER):
if not get_publisher().has_site_option('auquotidien-%s' % target):
# option not explicitely enabled, -> off.
return False
if user is CURRENT_USER:
user = get_request().user
if not user:
return False
target = target.strip('/')
if target == 'management':
target = 'forms'
admin_role = get_cfg('aq-permissions', {}).get(target, None)
if not admin_role:
return False
if not (user.is_admin or admin_role in (user.roles or [])):
return False
return True
class BackofficeRootDirectory(wcs.backoffice.root.RootDirectory):
def get_intro_text(self):
return _('Welcome on Publik back office interface')

View File

@ -26,82 +26,6 @@ from wcs.qommon.admin.texts import TextsDirectory
from wcs.formdef import FormDef
import wcs.myspace
from .payments import Invoice, Regie, is_payment_supported
class MyInvoicesDirectory(Directory):
_q_exports = ['']
def _q_traverse(self, path):
if not is_payment_supported():
raise errors.TraversalError()
get_response().breadcrumb.append(('invoices/', _('Invoices')))
return Directory._q_traverse(self, path)
def _q_index(self):
user = get_request().user
if not user or user.anonymous:
raise errors.AccessUnauthorizedError()
template.html_top(_('Invoices'))
r = TemplateIO(html=True)
r += TextsDirectory.get_html_text('aq-myspace-invoice')
r += get_session().display_message()
invoices = []
invoices.extend(Invoice.get_with_indexed_value(str('user_id'), str(user.id)))
invoices.sort(key=lambda x: (str(x.regie_id), -x.date.toordinal()))
last_regie_id = None
unpaid = False
for invoice in invoices:
if invoice.regie_id != last_regie_id:
if last_regie_id:
r += htmltext('</ul>')
if unpaid:
r += htmltext('<input type="submit" value="%s"/>') % _('Pay Selected Invoices')
r += htmltext('</form>')
last_regie_id = invoice.regie_id
r += htmltext('<h3>%s</h3>') % Regie.get(last_regie_id).label
unpaid = False
r += htmltext('<form action="%s/invoices/multiple">' % get_publisher().get_frontoffice_url())
r += htmltext('<ul>')
r += htmltext('<li>')
if not (invoice.paid or invoice.canceled):
r += htmltext('<input type="checkbox" name="invoice" value="%s"/>' % invoice.id)
unpaid = True
r += misc.localstrftime(invoice.date)
r += ' - '
r += '%s' % invoice.subject
r += ' - '
r += '%s' % invoice.amount
r += htmltext(' &euro;')
r += ' - '
button = '<span class="paybutton">%s</span>' % _('Pay')
if invoice.canceled:
r += _('canceled on %s') % misc.localstrftime(invoice.canceled_date)
r += ' - '
button = _('Details')
if invoice.paid:
r += _('paid on %s') % misc.localstrftime(invoice.paid_date)
r += ' - '
button = _('Details')
r += htmltext(
'<a href="%s/invoices/%s">%s</a>'
% (get_publisher().get_frontoffice_url(), invoice.id, button)
)
r += htmltext('</li>')
if last_regie_id:
r += htmltext('</ul>')
if unpaid:
r += htmltext('<input type="submit" value="%s"/>') % _('Pay Selected Invoices')
r += htmltext('</form>')
return r.getvalue()
class JsonDirectory(Directory):
"""Export of several lists in json, related to the current user or the
@ -145,9 +69,8 @@ class JsonDirectory(Directory):
class MyspaceDirectory(wcs.myspace.MyspaceDirectory):
_q_exports = ['', 'profile', 'new', 'password', 'remove', 'drafts', 'forms', 'invoices', 'json']
_q_exports = ['', 'profile', 'new', 'password', 'remove', 'drafts', 'forms', 'json']
invoices = MyInvoicesDirectory()
json = JsonDirectory()
def _q_traverse(self, path):
@ -189,8 +112,6 @@ class MyspaceDirectory(wcs.myspace.MyspaceDirectory):
profile_links.append('<a href="#my-profile">%s</a>' % _('My Profile'))
if user_forms:
profile_links.append('<a href="#my-forms">%s</a>' % _('My Forms'))
if is_payment_supported():
profile_links.append('<a href="invoices/">%s</a>' % _('My Invoices'))
root_url = get_publisher().get_root_url()
if user.can_go_in_backoffice():
@ -399,6 +320,3 @@ class MyspaceDirectory(wcs.myspace.MyspaceDirectory):
template.html_top(_('Removing Account'))
return form.render()
TextsDirectory.register('aq-myspace-invoice', N_('Message on top of invoices page'), category=N_('Invoices'))

View File

@ -1,221 +1,23 @@
import random
import string
from datetime import datetime as dt
import hashlib
import time
from decimal import Decimal
import urllib.parse
from quixote import redirect, get_publisher, get_request, get_session, get_response
from quixote.directory import Directory
from quixote.errors import QueryError
from quixote.html import TemplateIO, htmltext
if not set:
from sets import Set as set
eopayment = None
try:
import eopayment
except ImportError:
pass
from wcs.qommon import _, N_
from wcs.qommon import force_str
from wcs.qommon import errors, get_logger, get_cfg, emails
from wcs.qommon.storage import StorableObject
from wcs.qommon.form import htmltext, StringWidget, TextWidget, SingleSelectWidget, WidgetDict
from wcs.qommon.misc import simplify
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.workflows import EvolutionPart, WorkflowStatusItem, register_item_class, template_on_formdata
def is_payment_supported():
if not eopayment:
return False
return get_cfg('aq-permissions', {}).get('payments', None) is not None
from wcs.workflows import EvolutionPart, WorkflowStatusItem, register_item_class
class Regie(StorableObject):
_names = 'regies'
label = None
description = None
service = None
service_options = None
def get_payment_object(self):
return eopayment.Payment(kind=self.service, options=self.service_options)
class Invoice(StorableObject):
_names = 'invoices'
_hashed_indexes = ['user_id', 'regie_id']
_indexes = ['external_id']
user_id = None
regie_id = None
formdef_id = None
formdata_id = None
subject = None
details = None
amount = None
date = None
paid = False
paid_date = None
canceled = False
canceled_date = None
canceled_reason = None
next_status = None
external_id = None
request_kwargs = {}
def __init__(self, id=None, regie_id=None, formdef_id=None):
self.id = id
self.regie_id = regie_id
self.formdef_id = formdef_id
if get_publisher() and not self.id:
self.id = self.get_new_id()
def get_user(self):
if self.user_id:
return get_publisher().user_class.get(self.user_id, ignore_errors=True)
return None
@property
def username(self):
user = self.get_user()
return user.name if user else ''
def get_new_id(self, create=False):
# format : date-regie-formdef-alea-check
r = random.SystemRandom()
self.fresh = True
while True:
id = '-'.join(
[
dt.now().strftime('%Y%m%d'),
'r%s' % (self.regie_id or 'x'),
'f%s' % (self.formdef_id or 'x'),
''.join([r.choice(string.digits) for x in range(5)]),
]
)
crc = '%0.2d' % (hashlib.md5(id.encode('utf-8')).digest()[0] % 100)
id = id + '-' + crc
if not self.has_key(id):
return id
def store(self, *args, **kwargs):
if getattr(self, 'fresh', None) is True:
del self.fresh
notify_new_invoice(self)
return super(Invoice, self).store(*args, **kwargs)
def check_crc(cls, id):
try:
return int(id[-2:]) == (hashlib.md5(id[:-3].encode('utf-8')).digest()[0] % 100)
except:
return False
check_crc = classmethod(check_crc)
def pay(self):
self.paid = True
self.paid_date = dt.now()
self.store()
get_logger().info(_('invoice %s paid'), self.id)
notify_paid_invoice(self)
def unpay(self):
self.paid = False
self.paid_date = None
self.store()
get_logger().info(_('invoice %s unpaid'), self.id)
def cancel(self, reason=None):
self.canceled = True
self.canceled_date = dt.now()
if reason:
self.canceled_reason = reason
self.store()
notify_canceled_invoice(self)
get_logger().info(_('invoice %s canceled'), self.id)
def payment_url(self):
base_url = get_publisher().get_frontoffice_url()
return '%s/invoices/%s' % (base_url, self.id)
INVOICE_EVO_VIEW = {
'create': N_('Create Invoice <a href="%(url)s">%(id)s</a>: %(subject)s - %(amount)s &euro;'),
'pay': N_(
'Invoice <a href="%(url)s">%(id)s</a> is paid with transaction number %(transaction_order_id)s'
),
'cancel': N_('Cancel Invoice <a href="%(url)s">%(id)s</a>'),
'try': N_(
'Try paying invoice <a href="%(url)s">%(id)s</a> with transaction number %(transaction_order_id)s'
),
}
class InvoiceEvolutionPart(EvolutionPart):
action = None
id = None
subject = None
amount = None
transaction = None
def __init__(self, action, invoice, transaction=None):
self.action = action
self.id = invoice.id
self.subject = invoice.subject
self.amount = invoice.amount
self.transaction = transaction
def view(self):
vars = {
'url': '%s/invoices/%s' % (get_publisher().get_frontoffice_url(), self.id),
'id': self.id,
'subject': self.subject,
'amount': self.amount,
}
if not self.action:
return ''
if self.transaction:
vars['transaction_order_id'] = self.transaction.order_id
return htmltext(
'<p class="invoice-%s">' % self.action + _(INVOICE_EVO_VIEW[self.action]) % vars + '</p>'
)
pass
class Transaction(StorableObject):
_names = 'transactions'
_hashed_indexes = ['invoice_ids']
_indexes = ['order_id']
invoice_ids = None
order_id = None
start = None
end = None
bank_data = None
def __init__(self, *args, **kwargs):
self.invoice_ids = list()
StorableObject.__init__(self, *args, **kwargs)
def get_new_id(cls, create=False):
r = random.SystemRandom()
while True:
id = ''.join([r.choice(string.digits) for x in range(16)])
if not cls.has_key(id):
return id
get_new_id = classmethod(get_new_id)
class PaymentWorkflowStatusItem(WorkflowStatusItem):
@ -223,105 +25,19 @@ class PaymentWorkflowStatusItem(WorkflowStatusItem):
key = 'payment'
endpoint = False
category = 'interaction'
support_substitution_variables = True
subject = None
details = None
amount = None
regie_id = None
next_status = None
request_kwargs = {}
@classmethod
def is_available(self, workflow=None):
return is_payment_supported()
is_available = classmethod(is_available)
return False
def render_as_line(self):
if self.regie_id:
try:
return _('Payable to %s' % Regie.get(self.regie_id).label)
except KeyError:
return _('Payable (not completed)')
else:
return _('Payable (not completed)')
return _('Payable (obsolete)')
def get_parameters(self):
return ('subject', 'details', 'amount', 'regie_id', 'next_status', 'request_kwargs')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
if 'subject' in parameters:
form.add(StringWidget, '%ssubject' % prefix, title=_('Subject'), value=self.subject, size=40)
if 'details' in parameters:
form.add(
TextWidget, '%sdetails' % prefix, title=_('Details'), value=self.details, cols=80, rows=10
)
if 'amount' in parameters:
form.add(StringWidget, '%samount' % prefix, title=_('Amount'), value=self.amount)
if 'regie_id' in parameters:
form.add(
SingleSelectWidget,
'%sregie_id' % prefix,
title=_('Regie'),
value=self.regie_id,
options=[(None, '---')] + [(x.id, x.label) for x in Regie.select()],
)
if 'next_status' in parameters:
form.add(
SingleSelectWidget,
'%snext_status' % prefix,
title=_('Status after validation'),
value=self.next_status,
hint=_(
'Used only if the current status of the form does not contain any "Payment Validation" item'
),
options=[(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status],
)
if 'request_kwargs' in parameters:
keys = ['name', 'email', 'address', 'phone', 'info1', 'info2', 'info3']
hint = ''
hint += _('If the value starts by = it will be ' 'interpreted as a Python expression.')
hint += ' '
hint += _('Standard keys are: %s.') % (', '.join(keys))
form.add(
WidgetDict,
'request_kwargs',
title=_('Parameters for the payment system'),
hint=hint,
value=self.request_kwargs,
)
return ()
def perform(self, formdata):
invoice = Invoice(regie_id=self.regie_id, formdef_id=formdata.formdef.id)
invoice.user_id = formdata.user_id
invoice.formdata_id = formdata.id
invoice.next_status = self.next_status
if self.subject:
invoice.subject = template_on_formdata(formdata, self.compute(self.subject))
else:
invoice.subject = _('%(form_name)s #%(formdata_id)s') % {
'form_name': formdata.formdef.name,
'formdata_id': formdata.id,
}
invoice.details = template_on_formdata(formdata, self.compute(self.details))
invoice.amount = Decimal(self.compute(self.amount))
invoice.date = dt.now()
invoice.request_kwargs = {}
if self.request_kwargs:
for key, value in self.request_kwargs.items():
invoice.request_kwargs[key] = self.compute(value)
invoice.store()
# add a message in formdata.evolution
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.add_part(InvoiceEvolutionPart('create', invoice))
if not formdata.evolution:
formdata.evolution = []
formdata.evolution.append(evo)
formdata.store()
# redirect the user to "my invoices"
return get_publisher().get_frontoffice_url() + '/myspace/invoices/'
pass
register_item_class(PaymentWorkflowStatusItem)
@ -333,313 +49,41 @@ class PaymentCancelWorkflowStatusItem(WorkflowStatusItem):
endpoint = False
category = 'interaction'
reason = None
regie_id = None
@classmethod
def is_available(self, workflow=None):
return is_payment_supported()
is_available = classmethod(is_available)
return False
def render_as_line(self):
if self.regie_id:
if self.regie_id == '_all':
return _('Cancel all Payments')
else:
try:
return _('Cancel Payments for %s' % Regie.get(self.regie_id).label)
except KeyError:
return _('Cancel Payments (non completed)')
else:
return _('Cancel Payments (non completed)')
return _('Cancel Payments (obsolete)')
def get_parameters(self):
return ('reason', 'regie_id')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
if 'reason' in parameters:
form.add(StringWidget, '%sreason' % prefix, title=_('Reason'), value=self.reason, size=40)
if 'regie_id' in parameters:
form.add(
SingleSelectWidget,
'%sregie_id' % prefix,
title=_('Regie'),
value=self.regie_id,
options=[(None, '---'), ('_all', _('All Regies'))]
+ [(x.id, x.label) for x in Regie.select()],
)
return ()
def perform(self, formdata):
invoices_id = []
# get all invoices for the formdata and the selected regie
for evo in [evo for evo in formdata.evolution if evo.parts]:
for part in [part for part in evo.parts if isinstance(part, InvoiceEvolutionPart)]:
if part.action == 'create':
invoices_id.append(part.id)
elif part.id in invoices_id:
invoices_id.remove(part.id)
invoices = [Invoice.get(id) for id in invoices_id]
# select invoices for the selected regie (if not "all regies")
if self.regie_id != '_all':
invoices = [i for i in invoices if i.regie_id == self.regie_id]
# security filter: check user
invoices = [i for i in invoices if i.user_id == formdata.user_id]
# security filter: check formdata & formdef
invoices = [
i for i in invoices if (i.formdata_id == formdata.id) and (i.formdef_id == formdata.formdef.id)
]
evo = Evolution()
evo.time = time.localtime()
for invoice in invoices:
if not (invoice.paid or invoice.canceled):
invoice.cancel(self.reason)
evo.add_part(InvoiceEvolutionPart('cancel', invoice))
if not formdata.evolution:
formdata.evolution = []
formdata.evolution.append(evo)
formdata.store()
return get_publisher().get_frontoffice_url() + '/myspace/invoices/'
pass
register_item_class(PaymentCancelWorkflowStatusItem)
def request_payment(invoice_ids, url, add_regie=True):
for invoice_id in invoice_ids:
if not Invoice.check_crc(invoice_id):
raise QueryError()
invoices = [Invoice.get(invoice_id) for invoice_id in invoice_ids]
invoices = [i for i in invoices if not (i.paid or i.canceled)]
regie_ids = set([invoice.regie_id for invoice in invoices])
# Do not apply if more than one regie is used or no invoice is not paid or canceled
if len(invoices) == 0 or len(regie_ids) != 1:
url = get_publisher().get_frontoffice_url()
if get_session().user:
# FIXME: add error messages
url += '/myspace/invoices/'
return redirect(url)
if add_regie:
url = '%s%s' % (url, list(regie_ids)[0])
transaction = Transaction()
transaction.store()
transaction.invoice_ids = invoice_ids
transaction.start = dt.now()
amount = Decimal(0)
for invoice in invoices:
amount += Decimal(invoice.amount)
regie = Regie.get(invoice.regie_id)
payment = regie.get_payment_object()
# initialize request_kwargs using informations from the first invoice
# and update using current user informations
request_kwargs = getattr(invoices[0], 'request_kwargs', {})
request = get_request()
if request.user and request.user.email:
request_kwargs['email'] = request.user.email
if request.user and request.user.display_name:
request_kwargs['name'] = simplify(request.user.display_name)
(order_id, kind, data) = payment.request(amount, next_url=url, **request_kwargs)
transaction.order_id = order_id
transaction.store()
for invoice in invoices:
if invoice.formdef_id and invoice.formdata_id:
formdef = FormDef.get(invoice.formdef_id)
formdata = formdef.data_class().get(invoice.formdata_id)
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.add_part(InvoiceEvolutionPart('try', invoice, transaction=transaction))
if not formdata.evolution:
formdata.evolution = []
formdata.evolution.append(evo)
formdata.store()
if kind == eopayment.URL:
return redirect(force_str(data))
elif kind == eopayment.FORM:
return return_eopayment_form(data)
else:
raise NotImplementedError()
def return_eopayment_form(form):
r = TemplateIO(html=True)
r += htmltext('<html><body onload="document.payform.submit()">')
r += htmltext('<form action="%s" method="%s" name="payform">') % (
force_str(form.url),
force_str(form.method),
)
for field in form.fields:
r += htmltext('<input type="%s" name="%s" value="%s"/>') % (
force_str(field['type']),
force_str(field['name']),
force_str(field['value']),
)
r += htmltext('<input type="submit" name="submit" value="%s"/>') % _('Pay')
r += htmltext('</body></html>')
return r.getvalue()
class PaymentValidationWorkflowStatusItem(WorkflowStatusItem):
description = N_('Payment Validation')
key = 'payment-validation'
endpoint = False
category = 'interaction'
next_status = None
@classmethod
def is_available(self, workflow=None):
return is_payment_supported()
is_available = classmethod(is_available)
return False
def render_as_line(self):
return _('Wait for payment validation')
return _('Wait for payment validation (obsolete)')
def get_parameters(self):
return ('next_status',)
return ()
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
if 'next_status' in parameters:
form.add(
SingleSelectWidget,
'%snext_status' % prefix,
title=_('Status once validated'),
value=self.next_status,
options=[(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status],
)
def perform(self, formdata):
pass
register_item_class(PaymentValidationWorkflowStatusItem)
class PublicPaymentRegieBackDirectory(Directory):
def __init__(self, asynchronous):
self.asynchronous = asynchronous
def _q_lookup(self, component):
logger = get_logger()
request = get_request()
query_string = get_request().get_query()
if request.get_method() == 'POST' and query_string == '':
query_string = urllib.parse.urlencode(request.form)
try:
regie = Regie.get(component)
except KeyError:
raise errors.TraversalError()
if self.asynchronous:
logger.debug('received asynchronous notification %r' % query_string)
payment = regie.get_payment_object()
payment_response = payment.response(query_string)
logger.debug('payment response %r', payment_response)
order_id = payment_response.order_id
bank_data = payment_response.bank_data
transaction = Transaction.get_on_index(order_id, 'order_id', ignore_errors=True)
if transaction is None:
raise errors.TraversalError()
commit = False
if not transaction.end:
commit = True
transaction.end = dt.now()
transaction.bank_data = bank_data
transaction.store()
if payment_response.signed and payment_response.is_paid() and commit:
logger.info(
'transaction %s successful, bankd_id:%s bank_data:%s'
% (order_id, payment_response.transaction_id, bank_data)
)
for invoice_id in transaction.invoice_ids:
# all invoices are now paid
invoice = Invoice.get(invoice_id)
invoice.pay()
# workflow for each related formdata
if invoice.formdef_id and invoice.formdata_id:
next_status = invoice.next_status
formdef = FormDef.get(invoice.formdef_id)
formdata = formdef.data_class().get(invoice.formdata_id)
wf_status = formdata.get_status()
for item in wf_status.items:
if isinstance(item, PaymentValidationWorkflowStatusItem):
next_status = item.next_status
break
if next_status is not None:
formdata.status = 'wf-%s' % next_status
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.add_part(InvoiceEvolutionPart('pay', invoice, transaction=transaction))
if not formdata.evolution:
formdata.evolution = []
formdata.evolution.append(evo)
formdata.store()
# performs the items of the new status
formdata.perform_workflow()
elif payment_response.is_error() and commit:
logger.info('transaction %s finished with failure, bank_data:%s' % (order_id, bank_data))
elif commit:
logger.info('transaction %s is in intermediate state, bank_data:%s' % (order_id, bank_data))
if payment_response.return_content != None and self.asynchronous:
get_response().set_content_type('text/plain')
return payment_response.return_content
else:
if payment_response.is_error():
# TODO: here return failure message
get_session().message = ('info', _('Payment failed'))
else:
# TODO: Here return success message
get_session().message = ('error', _('Payment succeeded'))
url = get_publisher().get_frontoffice_url()
if get_session().user:
url += '/myspace/invoices/'
return redirect(url)
class PublicPaymentDirectory(Directory):
_q_exports = ['init', 'back', 'back_asynchronous']
back = PublicPaymentRegieBackDirectory(False)
back_asynchronous = PublicPaymentRegieBackDirectory(True)
def init(self):
if 'invoice_ids' not in get_request().form:
raise QueryError()
invoice_ids = get_request().form.get('invoice_ids').split(' ')
for invoice_id in invoice_ids:
if not Invoice.check_crc(invoice_id):
raise QueryError()
url = get_publisher().get_frontoffice_url() + '/payment/back/'
return request_payment(invoice_ids, url)
def notify_new_invoice(invoice):
notify_invoice(invoice, 'payment-new-invoice-email')
def notify_paid_invoice(invoice):
notify_invoice(invoice, 'payment-invoice-paid-email')
def notify_canceled_invoice(invoice):
notify_invoice(invoice, 'payment-invoice-canceled-email')
def notify_invoice(invoice, template):
user = invoice.get_user()
assert user is not None
regie = Regie.get(id=invoice.regie_id)
emails.custom_template_email(
template,
{'user': user, 'invoice': invoice, 'regie': regie, 'invoice_url': invoice.payment_url()},
user.email,
fire_and_forget=True,
)

View File

@ -1,624 +0,0 @@
import time
import pprint
import locale
import decimal
import datetime
from quixote import get_request, get_response, get_session, redirect
from quixote.directory import Directory, AccessControlled
from quixote.html import TemplateIO, htmltext
import wcs
import wcs.admin.root
from wcs.formdef import FormDef
from wcs.qommon import _, N_
from wcs.qommon import errors, misc, template, get_logger
from wcs.qommon.form import *
from wcs.qommon.admin.emails import EmailsDirectory
from wcs.qommon.backoffice.menu import html_top
from wcs.qommon import get_cfg
from .payments import eopayment, Regie, is_payment_supported, Invoice, Transaction, notify_paid_invoice
from wcs.qommon.admin.texts import TextsDirectory
if not set:
from sets import Set as set
def invoice_as_html(invoice):
r = TemplateIO(html=True)
r += htmltext('<div id="invoice">')
r += htmltext('<h2>%s</h2>') % _('Invoice: %s') % invoice.subject
r += htmltext('<h3>%s') % _('Amount: %s') % invoice.amount
r += htmltext(' &euro;</h3>')
r += htmltext('<!-- DEBUG \n')
r += 'Invoice:\n'
r += pprint.pformat(invoice.__dict__)
for transaction in Transaction.get_with_indexed_value('invoice_ids', invoice.id):
r += '\nTransaction:\n'
r += pprint.pformat(transaction.__dict__)
r += htmltext('\n-->')
if invoice.formdef_id and invoice.formdata_id and get_session().user == invoice.user_id:
formdef = FormDef.get(invoice.formdef_id)
if formdef:
formdata = formdef.data_class().get(invoice.formdata_id, ignore_errors=True)
if formdata:
name = _('%(form_name)s #%(formdata_id)s') % {
'form_name': formdata.formdef.name,
'formdata_id': formdata.id,
}
r += htmltext('<p class="from">%s <a href="%s">%s</a></p>') % (
_('From:'),
formdata.get_url(),
name,
)
r += htmltext('<p class="regie">%s</p>') % _('Regie: %s') % Regie.get(invoice.regie_id).label
r += htmltext('<p class="date">%s</p>') % _('Created on: %s') % misc.localstrftime(invoice.date)
if invoice.details:
r += htmltext('<p class="details">%s</p>') % _('Details:')
r += htmltext('<div class="details">')
r += htmltext(invoice.details)
r += htmltext('</div>')
if invoice.canceled:
r += htmltext('<p class="canceled">')
r += '%s' % _('canceled on %s') % misc.localstrftime(invoice.canceled_date)
if invoice.canceled_reason:
r += ' (%s)' % invoice.canceled_reason
r += htmltext('</p>')
if invoice.paid:
r += htmltext('<p class="paid">%s</p>') % _('paid on %s') % misc.localstrftime(invoice.paid_date)
r += htmltext('</div>')
return r.getvalue()
class InvoicesDirectory(Directory):
_q_exports = ['', 'multiple']
def _q_traverse(self, path):
if not is_payment_supported():
raise errors.TraversalError()
get_response().filter['bigdiv'] = 'profile'
if get_session().user:
# fake breadcrumb
get_response().breadcrumb.append(('myspace/', _('My Space')))
get_response().breadcrumb.append(('invoices/', _('Invoices')))
return Directory._q_traverse(self, path)
def multiple(self):
invoice_ids = get_request().form.get('invoice')
if type(invoice_ids) is not list:
return redirect('%s' % invoice_ids)
return redirect('+'.join(invoice_ids))
def _q_lookup(self, component):
if str('+') in component:
invoice_ids = component.split(str('+'))
else:
invoice_ids = [component]
for invoice_id in invoice_ids:
if not Invoice.check_crc(invoice_id):
raise errors.TraversalError()
template.html_top(_('Invoices'))
r = TemplateIO(html=True)
r += TextsDirectory.get_html_text('aq-invoice')
regies_id = set()
for invoice_id in invoice_ids:
try:
invoice = Invoice.get(invoice_id)
except KeyError:
raise errors.TraversalError()
r += invoice_as_html(invoice)
if not (invoice.paid or invoice.canceled):
regies_id.add(invoice.regie_id)
if len(regies_id) == 1:
r += htmltext('<p class="command">')
r += htmltext('<a href="%s/payment/init?invoice_ids=%s">') % (
get_publisher().get_frontoffice_url(),
component,
)
if len(invoice_ids) > 1:
r += _('Pay Selected Invoices')
else:
r += _('Pay')
r += htmltext('</a></p>')
if len(regies_id) > 1:
r += _('You can not pay to different regies.')
return r.getvalue()
def _q_index(self):
return redirect('..')
class RegieDirectory(Directory):
_q_exports = ['', 'edit', 'delete', 'options']
def __init__(self, regie):
self.regie = regie
def _q_index(self):
html_top('payments', title=_('Regie: %s') % self.regie.label)
r = TemplateIO(html=True)
get_response().filter['sidebar'] = self.get_sidebar()
r += htmltext('<h2>%s</h2>') % _('Regie: %s') % self.regie.label
r += get_session().display_message()
if self.regie.description:
r += htmltext('<div class="bo-block">')
r += htmltext('<p>')
r += self.regie.description
r += htmltext('</p>')
r += htmltext('</div>')
if self.regie.service:
r += htmltext('<div class="bo-block">')
url = get_publisher().get_frontoffice_url() + '/payment/back_asynchronous/'
url += str(self.regie.id)
r += htmltext('<p>')
r += '%s %s' % (_('Banking Service:'), self.regie.service)
r += htmltext(' (<a href="options">%s</a>)') % _('options')
r += htmltext('</p>')
r += htmltext('<p>')
r += '%s %s' % (_('Payment notification URL:'), url)
r += htmltext('</div>')
r += self.invoice_listing()
return r.getvalue()
def get_sidebar(self):
r = TemplateIO(html=True)
r += htmltext('<ul>')
r += htmltext('<li><a href="edit">%s</a></li>') % _('Edit')
r += htmltext('<li><a href="delete">%s</a></li>') % _('Delete')
r += htmltext('</ul>')
return r.getvalue()
def edit(self):
form = self.form()
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
self.submit(form)
return redirect('..')
html_top('payments', title=_('Edit Regie: %s') % self.regie.label)
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Regie: %s') % self.regie.label
r += form.render()
return r.getvalue()
def form(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'label', title=_('Label'), required=True, value=self.regie.label)
form.add(
TextWidget, 'description', title=_('Description'), value=self.regie.description, rows=5, cols=60
)
form.add(
SingleSelectWidget,
'service',
title=_('Banking Service'),
value=self.regie.service,
required=True,
options=[
('dummy', _('Dummy (for tests)')),
('sips', 'SIPS'),
('systempayv2', 'systempay (Banque Populaire)'),
('spplus', _('SP+ (Caisse d\'epargne)')),
],
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
return form
def submit(self, form):
for k in ('label', 'description', 'service'):
widget = form.get_widget(k)
if widget:
setattr(self.regie, k, widget.parse())
self.regie.store()
def delete(self):
form = Form(enctype='multipart/form-data')
form.widgets.append(HtmlWidget('<p>%s</p>' % _('You are about to irrevocably delete this regie.')))
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_submit() == 'cancel':
return redirect('..')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
r = TemplateIO(html=True)
html_top('payments', title=_('Delete Regie'))
r += htmltext('<h2>%s</h2>') % _('Deleting Regie: %s') % self.regie.label
r += form.render()
return r.getvalue()
else:
self.regie.remove_self()
return redirect('..')
def option_form(self):
form = Form(enctype='multipart/form-data')
module = eopayment.get_backend(self.regie.service)
service_options = {}
for infos in module.description['parameters']:
if 'default' in infos:
service_options[infos['name']] = infos['default']
service_options.update(self.regie.service_options or {})
banking_titles = {
('dummy', 'direct_notification_url'): N_('Direct Notification URL'),
('dummy', 'siret'): N_('Dummy SIRET'),
}
for infos in module.description['parameters']:
name = infos['name']
caption = infos.get('caption', name)
title = force_str(banking_titles.get((self.regie.service, name), caption))
kwargs = {}
widget = StringWidget
if infos.get('help_text') is not None:
kwargs['hint'] = _(infos['help_text'])
if infos.get('required', False):
kwargs['required'] = True
if infos.get('max_length') is not None:
kwargs['size'] = infos['max_length']
elif infos.get('length') is not None:
kwargs['size'] = infos['length']
else:
kwargs['size'] = 80
if kwargs['size'] > 100:
widget = TextWidget
kwargs['cols'] = 80
kwargs['rows'] = 5
if 'type' not in infos or infos['type'] is str:
form.add(widget, name, title=_(title), value=service_options.get(name), **kwargs)
elif infos['type'] is bool:
form.add(CheckboxWidget, name, title=title, value=service_options.get(name), **kwargs)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
return form
def options(self):
r = TemplateIO(html=True)
form = self.option_form()
module = eopayment.get_backend(self.regie.service)
try:
r += htmltext('<!-- Payment backend description: \n')
r += pprint.pformat(module.description)
r += htmltext('-->')
except:
return template.error_page(_('Payment backend do not list its options'))
raise errors.TraversalError()
r += htmltext('<!-- \n')
r += 'Service options\n'
r += pprint.pformat(self.regie.service_options)
r += htmltext('-->')
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
if self.submit_options(form, module):
return redirect('..')
html_top('payments', title=_('Edit Service Options'))
r += htmltext('<h2>%s</h2>') % _('Edit Service Options')
r += form.render()
return r.getvalue()
def submit_options(self, form, module):
# extra validation
error = False
for infos in module.description['parameters']:
widget = form.get_widget(infos['name'])
value = widget.parse()
if value and 'validation' in infos:
try:
if not infos['validation'](value):
widget.set_error(_('Valeur invalide'))
error = True
except ValueError as e:
widget.set_error(_(e.message))
error = True
if error:
return False
if not self.regie.service_options:
self.regie.service_options = {}
for infos in module.description['parameters']:
name = infos['name']
value = form.get_widget(name).parse()
if value is None:
value = ''
if hasattr(value, 'strip'):
value = value.strip()
if infos.get('default') is not None:
if value == infos['default']:
self.regie.service_options.pop(name, None)
else:
self.regie.service_options[name] = form.get_widget(name).parse()
elif not value:
self.regie.service_options.pop(name, None)
else:
self.regie.service_options[name] = form.get_widget(name).parse()
self.regie.store()
return True
PAGINATION = 50
def monetary_amount(self, val):
if not val:
return ''
if isinstance(val, str):
val = val.replace(',', '.')
return '%.2f' % decimal.Decimal(val)
def get_sort_by(self):
request = get_request()
sort_by = request.form.get('sort_by')
if sort_by not in ('date', 'paid_date', 'username'):
sort_by = 'date'
return sort_by
def get_invoices(self):
sort_by = self.get_sort_by()
invoices = list(Invoice.get_with_indexed_value('regie_id', self.regie.id, ignore_errors=True))
if 'date' in sort_by:
reverse = True
key = lambda i: getattr(i, sort_by) or datetime.datetime.now()
else:
reverse = False
key = lambda i: getattr(i, sort_by) or ''
invoices.sort(reverse=reverse, key=key)
return invoices
def unpay(self, request, invoice):
get_logger().info(
_('manually set unpaid invoice %(invoice_id)s in regie %(regie)s')
% dict(invoice_id=invoice.id, regie=self.regie.id)
)
transaction = Transaction()
transaction.invoice_ids = [invoice.id]
transaction.order_id = 'Manual action'
transaction.start = datetime.datetime.now()
transaction.end = transaction.start
transaction.bank_data = {
'action': 'Set unpaid',
'by': request.user.get_display_name() + ' (%s)' % request.user.id,
}
transaction.store()
invoice.unpay()
def pay(self, request, invoice):
get_logger().info(
_('manually set paid invoice %(invoice_id)s in regie %(regie)s')
% dict(invoice_id=invoice.id, regie=self.regie.id)
)
transaction = Transaction()
transaction.invoice_ids = [invoice.id]
transaction.order_id = 'Manual action'
transaction.start = datetime.datetime.now()
transaction.end = transaction.start
transaction.bank_data = {
'action': 'Set paid',
'by': request.user.get_display_name() + ' (%s)' % request.user.id,
}
transaction.store()
invoice.pay()
def invoice_listing(self):
request = get_request()
get_response().add_css_include('../../themes/auquotidien/admin.css')
if request.get_method() == 'POST':
invoice_id = request.form.get('id')
invoice = Invoice.get(invoice_id, ignore_errors=True)
if invoice:
if 'unpay' in request.form:
self.unpay(request, invoice)
elif 'pay' in request.form:
self.pay(request, invoice)
return redirect('')
try:
offset = int(request.form.get('offset', 0))
except ValueError:
offset = 0
r = TemplateIO(html=True)
r += htmltext('<table id="invoice-listing" borderspacing="0">')
r += htmltext('<thead>')
r += htmltext('<tr>')
r += htmltext('<td><a href="?sort_by=date&offset=%d">Creation</a></td>') % offset
r += htmltext('<td>Amount</td>')
r += htmltext('<td><a href="?sort_by=paid_date&offset=%d">Paid</a></td>') % offset
r += htmltext('<td><a href="?sort_by=username&offset=%d">User</a></td>') % offset
r += htmltext('<td>Titre</td>')
r += htmltext('<td></td>')
r += htmltext('</tr>')
r += htmltext('</thead>')
invoices = self.get_invoices()
for invoice in invoices[offset : offset + self.PAGINATION]:
r += htmltext('<tbody class="invoice-rows">')
r += htmltext('<tr class="invoice-row"><td>')
r += misc.localstrftime(invoice.date)
r += htmltext('</td><td class="amount">')
r += self.monetary_amount(invoice.amount)
r += htmltext('</td><td>')
if invoice.paid:
r += misc.localstrftime(invoice.paid_date)
else:
r += ''
r += htmltext('</td><td>')
user = invoice.get_user()
if user:
r += user.name
r += htmltext('</td><td class="subject">%s</td>') % (invoice.subject or '')
r += htmltext('<td>')
r += htmltext('<form method="post">')
r += htmltext('<input type="hidden" name="id" value="%s"/> ') % invoice.id
if invoice.paid:
r += htmltext('<input type="submit" name="unpay" value="%s"/>') % _('Set unpaid')
else:
r += htmltext('<input type="submit" name="pay" value="%s"/>') % _('Set paid')
r += htmltext('</form>')
r += htmltext('</td></tr>')
transactions = list(Transaction.get_with_indexed_value('invoice_ids', invoice.id))
for transaction in sorted(transactions, key=lambda x: x.start):
r += htmltext('<tr>')
r += htmltext('<td></td>')
r += htmltext('<td colspan="5">')
r += 'OrderID: %s' % transaction.order_id
r += ' Start: %s' % transaction.start
if transaction.end:
r += ' End: %s' % transaction.end
if transaction.bank_data:
r += ' Bank data: %r' % transaction.bank_data
r += htmltext('</td>')
r += htmltext('</tr>')
r += htmltext('</tbody>')
r += htmltext('</tbody></table>')
if offset != 0:
r += htmltext('<a href="?offset=%d>%s</a> ') % (max(0, offset - self.PAGINATION), _('Previous'))
if offset + self.PAGINATION < len(invoices):
r += htmltext('<a href="?offset=%d>%s</a> ') % (max(0, offset - self.PAGINATION), _('Previous'))
return r.getvalue()
class RegiesDirectory(Directory):
_q_exports = ['', 'new']
def _q_traverse(self, path):
get_response().breadcrumb.append(('regie/', _('Regies')))
return Directory._q_traverse(self, path)
def _q_index(self):
return redirect('..')
def new(self):
regie_ui = RegieDirectory(Regie())
form = regie_ui.form()
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
regie_ui.submit(form)
return redirect('%s/' % regie_ui.regie.id)
get_response().breadcrumb.append(('new', _('New Regie')))
html_top('payments', title=_('New Regie'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Regie')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
try:
regie = Regie.get(component)
except KeyError:
raise errors.TraversalError()
get_response().breadcrumb.append((str(regie.id), regie.label))
return RegieDirectory(regie)
class PaymentsDirectory(AccessControlled, Directory):
_q_exports = ['', 'regie']
label = N_('Payments')
regie = RegiesDirectory()
def is_accessible(self, user):
from .backoffice import check_visibility
return check_visibility('payments', user)
def _q_access(self):
user = get_request().user
if not user:
raise errors.AccessUnauthorizedError()
if not self.is_accessible(user):
raise errors.AccessForbiddenError(
public_msg=_('You are not allowed to access Payments Management'), location_hint='backoffice'
)
get_response().breadcrumb.append(('payments/', _('Payments')))
def _q_index(self):
html_top('payments', _('Payments'))
get_response().filter['sidebar'] = self.get_sidebar()
r = TemplateIO(html=True)
if not is_payment_supported:
r += htmltext('<p class="infonotice">')
r += _('Payment is not supported.')
r += htmltext('</p>')
regies = Regie.select()
r += htmltext('<h2>%s</h2>') % _('Regies')
if not regies:
r += htmltext('<p>')
r += _('There are no regies defined at the moment.')
r += htmltext('</p>')
r += htmltext('<ul class="biglist" id="regies-list">')
for l in regies:
regie_id = l.id
r += htmltext('<li class="biglistitem" id="itemId_%s">') % regie_id
r += htmltext('<strong class="label"><a href="regie/%s/">%s</a></strong>') % (regie_id, l.label)
r += htmltext('</li>')
r += htmltext('</ul>')
return r.getvalue()
def get_sidebar(self):
r = TemplateIO(html=True)
r += htmltext('<ul id="sidebar-actions">')
r += htmltext(' <li><a class="new-item" href="regie/new">%s</a></li>') % _('New Regie')
r += htmltext('</ul>')
return r.getvalue()
TextsDirectory.register('aq-invoice', N_('Message on top of an invoice'), category=N_('Invoices'))
EmailsDirectory.register(
'payment-new-invoice-email',
N_('New invoice'),
N_('Available variables: user, regie, invoice, invoice_url'),
category=N_('Invoices'),
default_subject=N_('New invoice'),
default_body=N_(
'''
A new invoice is available at [invoice_url].
'''
),
)
EmailsDirectory.register(
'payment-invoice-paid-email',
N_('Paid invoice'),
N_('Available variables: user, regie, invoice, invoice_url'),
category=N_('Invoices'),
default_subject=N_('Paid invoice'),
default_body=N_(
'''
The invoice [invoice_url] has been paid.
'''
),
)
EmailsDirectory.register(
'payment-invoice-canceled-email',
N_('Canceled invoice'),
N_('Available variables: user, regie, invoice, invoice_url'),
category=N_('Invoices'),
default_subject=N_('Canceled invoice'),
default_body=N_(
'''
The invoice [invoice.id] has been canceled.
'''
),
)

View File

@ -37,8 +37,6 @@ from wcs.qommon.admin.emails import EmailsDirectory
from wcs.qommon.admin.texts import TextsDirectory
from .myspace import MyspaceDirectory
from .payments import PublicPaymentDirectory
from .payments_ui import InvoicesDirectory
from . import admin
@ -240,8 +238,6 @@ class AlternateRootDirectory(OldRootDirectory):
'__version__',
'themes',
'pages',
'payment',
'invoices',
'roles',
'api',
'code',
@ -260,8 +256,6 @@ class AlternateRootDirectory(OldRootDirectory):
ident = AlternateIdentDirectory()
myspace = MyspaceDirectory()
saml = Saml2Directory()
payment = PublicPaymentDirectory()
invoices = InvoicesDirectory()
code = wcs.forms.root.TrackingCodesDirectory()
preview = AlternatePreviewDirectory()

View File

@ -111,55 +111,3 @@ def test_aq_permissions_panel(empty_siteoptions):
resp = app.get('/backoffice/settings/')
assert 'aq/permissions' in resp.text
resp = app.get('/backoffice/settings/aq/permissions')
def test_menu_items(empty_siteoptions):
create_superuser()
role = create_role()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'auquotidien-payments', 'true')
pub.site_options.write(fd)
for area in ('payments',):
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'auquotidien-%s' % area, 'true')
pub.site_options.write(fd)
pub.cfg['aq-permissions'] = {area: None}
pub.write_cfg()
user1.is_admin = True
user1.roles = []
user1.store()
app = login(get_app(pub))
resp = app.get('/backoffice/')
assert not '/%s/' % area in resp.text
resp = app.get('/backoffice/%s/' % area, status=403)
pub.cfg['aq-permissions'] = {area: 'XXX'}
pub.write_cfg()
resp = app.get('/backoffice/')
assert '/%s/' % area in resp.text
resp = app.get('/backoffice/%s/' % area, status=200)
user1.is_admin = False
user1.roles = [role.id]
user1.store()
resp = app.get('/backoffice/')
assert not '/%s/' % area in resp.text
resp = app.get('/backoffice/%s/' % area, status=403)
user1.is_admin = False
user1.roles = [role.id, 'XXX']
user1.store()
resp = app.get('/backoffice/')
assert '/%s/' % area in resp.text
resp = app.get('/backoffice/%s/' % area, status=200)

View File

@ -1,31 +0,0 @@
import shutil
from quixote import cleanup
from wcs.qommon.http_request import HTTPRequest
from auquotidien.modules import payments
from utilities import get_app, login, create_temporary_pub
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.write_cfg()
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
def test_invoice_crc():
invoice = payments.Invoice()
for i in range(20):
new_id = invoice.get_new_id()
assert payments.Invoice.check_crc(new_id)