add Saga payment method (#46502)

This commit is contained in:
Benjamin Dauvergne 2020-09-09 22:47:58 +02:00
parent 4ba0534d01
commit 0e4f6f248e
7 changed files with 463 additions and 2 deletions

View File

@ -31,7 +31,7 @@ from .common import ( # noqa: F401
__all__ = ['Payment', 'URL', 'HTML', 'FORM', 'SIPS', 'SYSTEMPAY',
'TIPI', 'DUMMY', 'get_backend', 'RECEIVED', 'ACCEPTED', 'PAID',
'DENIED', 'CANCELED', 'CANCELLED', 'ERROR', 'WAITING',
'EXPIRED', 'get_backends', 'PAYFIP_WS']
'EXPIRED', 'get_backends', 'PAYFIP_WS', 'SAGA']
if six.PY3:
__all__.extend(['KEYWARE', 'MOLLIE'])
@ -47,6 +47,7 @@ PAYZEN = 'payzen'
PAYFIP_WS = 'payfip_ws'
KEYWARE = 'keyware'
MOLLIE = 'mollie'
SAGA = 'saga'
logger = logging.getLogger(__name__)
@ -57,7 +58,7 @@ def get_backend(kind):
return module.Payment
__BACKENDS = [DUMMY, SIPS, SIPS2, SYSTEMPAY, OGONE, PAYBOX, PAYZEN,
TIPI, PAYFIP_WS, KEYWARE, MOLLIE]
TIPI, PAYFIP_WS, KEYWARE, MOLLIE, SAGA]
def get_backends():

250
eopayment/saga.py Normal file
View File

@ -0,0 +1,250 @@
# -*- coding: utf-8 -*-
# eopayment - online payment library
# Copyright (C) 2011-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals, print_function
import functools
from six.moves.urllib.parse import urljoin, parse_qs
import lxml.etree as ET
import zeep
import zeep.exceptions
from .common import (PaymentException, PaymentCommon, ResponseError, URL, PAID,
DENIED, CANCELLED, ERROR, PaymentResponse)
_zeep_transport = None
class SagaError(PaymentException):
pass
class Saga(object):
def __init__(self, wsdl_url, service_url=None, zeep_client_kwargs=None):
self.wsdl_url = wsdl_url
kwargs = (zeep_client_kwargs or {}).copy()
if _zeep_transport and 'transport' not in kwargs:
kwargs['transport'] = _zeep_transport
self.client = zeep.Client(wsdl_url, **kwargs)
# distribued WSDL is wrong :/
if service_url:
self.client.service._binding_options['address'] = service_url
def soap_call(self, operation, content_tag, **kwargs):
content = getattr(self.client.service, operation)(**kwargs)
if 'ISO-8859-1' in content:
encoded_content = content.encode('latin1')
else:
encoded_content = content.encode('utf-8')
try:
tree = ET.fromstring(encoded_content)
except Exception:
raise SagaError('Invalid SAGA response "%s"' % content[:1024])
if tree.tag == 'erreur':
raise SagaError(tree.text)
if tree.tag != content_tag:
raise SagaError('Invalid SAGA response "%s"' % content[:1024])
return tree
def transaction(self, num_service, id_tiers, compte, lib_ecriture, montant,
urlretour_asynchrone, email, urlretour_synchrone):
tree = self.soap_call(
'Transaction',
'url',
num_service=num_service,
id_tiers=id_tiers,
compte=compte,
lib_ecriture=lib_ecriture,
montant=montant,
urlretour_asynchrone=urlretour_asynchrone,
email=email,
urlretour_synchrone=urlretour_synchrone)
# tree == <url>...</url>
return tree.text
def page_retour(self, operation, idop):
tree = self.soap_call(
operation,
'ok',
idop=idop)
# tree == <ok id_tiers="A1"
# etat="paye" email="albert,dupond@monsite.com" num_service="222222"
# montant="100.00" compte="708"
# lib_ecriture="Paiement pour M. Albert Dupondréservationsejourxxxx" />
return tree.attrib
def page_retour_synchrone(self, idop):
return self.page_retour('PageRetourSynchrone', idop)
def page_retour_asynchrone(self, idop):
return self.page_retour('PageRetourAsynchrone', idop)
class Payment(PaymentCommon):
description = {
'caption': 'Système de paiement Saga de Futur System',
'parameters': [
{
'name': 'base_url',
'caption': 'URL de base du WSDL',
'help_text': 'Sans la partie /paiement_internet_ws_ministere?wsdl',
'required': True,
},
{
'name': 'num_service',
'caption': 'Numéro du service',
'required': True,
},
{
'name': 'compte',
'caption': 'Compte de recettes',
'required': True,
},
{
'name': 'normal_return_url',
'caption': 'URL de retour',
'default': '',
'required': True,
},
{
'name': 'automatic_return_url',
'caption': 'URL de notification',
'required': False,
},
]
}
@property
def saga(self):
return Saga(
wsdl_url=urljoin(self.base_url, 'paiement_internet_ws_ministere?wsdl'))
def request(self, amount, email, subject, orderid=None, **kwargs):
num_service = self.num_service
id_tiers = '-1'
compte = self.compte
lib_ecriture = subject
montant = self.clean_amount(amount, max_amount=100000, cents=False)
urlretour_synchrone = self.normal_return_url
urlretour_asynchrone = self.automatic_return_url
url = self.saga.transaction(
num_service=num_service,
id_tiers=id_tiers,
compte=compte,
lib_ecriture=lib_ecriture,
montant=montant,
urlretour_asynchrone=urlretour_asynchrone,
email=email,
urlretour_synchrone=urlretour_synchrone)
try:
idop = parse_qs(url.split('?', 1)[-1])['idop'][0]
except Exception:
raise SagaError('Invalid payment URL, no idop: %s' % url)
return str(idop), URL, url
def response(self, query_string, **kwargs):
fields = parse_qs(query_string, True)
idop = (fields.get('idop') or [None])[0]
if not idop:
raise ResponseError('missing idop parameter in query')
redirect = kwargs.get('redirect', False)
if redirect:
response = self.saga.page_retour_synchrone(idop=idop)
else:
response = self.saga.page_retour_asynchrone(idop=idop)
etat = response['etat']
if etat == 'paye':
result = PAID
bank_status = 'paid'
elif etat == 'refus':
result = DENIED
bank_status = 'refused'
elif etat == 'abandon':
result = CANCELLED
bank_status = 'cancelled'
else:
result = ERROR
bank_status = 'unknown result code: etat=%r' % etat
return PaymentResponse(
result=result,
bank_status=bank_status,
signed=True,
bank_data=response,
order_id=idop,
transaction_id=idop,
test=False)
if __name__ == '__main__':
import click
def show_payfip_error(func):
@functools.wraps(func)
def f(*args, **kwargs):
try:
return func(*args, **kwargs)
except SagaError as e:
click.echo(click.style('SAGA ERROR : %s' % e, fg='red'))
return f
@click.group()
@click.option('--wsdl-url')
@click.option('--service-url', default=None)
@click.pass_context
def main(ctx, wsdl_url, service_url):
import logging
logging.basicConfig(level=logging.INFO)
# hide warning from zeep
logging.getLogger('zeep.wsdl.bindings.soap').level = logging.ERROR
ctx.obj = Saga(wsdl_url=wsdl_url, service_url=service_url)
@main.command()
@click.option('--num-service', type=str, required=True)
@click.option('--id-tiers', type=str, required=True)
@click.option('--compte', type=str, required=True)
@click.option('--lib-ecriture', type=str, required=True)
@click.option('--montant', type=str, required=True)
@click.option('--urlretour-asynchrone', type=str, required=True)
@click.option('--email', type=str, required=True)
@click.option('--urlretour-synchrone', type=str, required=True)
@click.pass_obj
@show_payfip_error
def transaction(saga, num_service, id_tiers, compte, lib_ecriture, montant,
urlretour_asynchrone, email, urlretour_synchrone):
url = saga.transaction(
num_service=num_service,
id_tiers=id_tiers,
compte=compte,
lib_ecriture=lib_ecriture,
montant=montant,
urlretour_asynchrone=urlretour_asynchrone,
email=email,
urlretour_synchrone=urlretour_synchrone)
print('url:', url)
main()

112
tests/conftest.py Normal file
View File

@ -0,0 +1,112 @@
# eopayment - online payment library
# Copyright (C) 2011-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import pytest
import httmock
import lxml.etree as ET
from requests.adapters import HTTPAdapter
from requests import Session
def pytest_addoption(parser):
parser.addoption("--save-http-session", action="store_true", help="save HTTP session")
parser.addoption("--target-url", help="target URL")
class LoggingAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
self.history = []
super(LoggingAdapter, self).__init__(*args, **kwargs)
def send(self, request, *args, **kwargs):
response = super(LoggingAdapter, self).send(request, *args, **kwargs)
self.history.append((request, response))
return response
def xmlindent(content):
if hasattr(content, 'encode') or hasattr(content, 'decode'):
content = ET.fromstring(content)
return ET.tostring(content, pretty_print=True).decode('utf-8', 'ignore')
@pytest.fixture
def record_http_session(request):
module_name = request.module.__name__.split('test_', 1)[-1]
function_name = request.function.__name__
save = request.config.getoption('--save-http-session')
filename = 'tests/data/%s-%s.json' % (module_name, function_name)
def is_xml_content_type(r):
headers = r.headers
content_type = headers.get('content-type')
return content_type and content_type.startswith(('text/xml', 'application/xml'))
if save:
session = Session()
adapter = LoggingAdapter()
session.mount('http://', adapter)
session.mount('https://', adapter)
try:
yield session
finally:
with open(filename, 'w') as fd:
history = []
for request, response in adapter.history:
request_content = (request.body or b'')
response_content = (response.content or b'')
if is_xml_content_type(request):
request_content = xmlindent(request_content)
else:
request_content = request_content.decode('utf-8')
if is_xml_content_type(response):
response_content = xmlindent(response_content)
else:
response_content = response_content.decode('utf-8')
history.append((request_content, response_content))
json.dump(history, fd)
else:
with open(filename) as fd:
history = json.load(fd)
class Mocker:
counter = 0
@httmock.urlmatch()
def mock(self, url, request):
expected_request_content, response_content = history[self.counter]
self.counter += 1
if expected_request_content:
request_content = request.body or b''
if is_xml_content_type(request):
request_content = xmlindent(request_content)
else:
request_content = request_content.decode('utf-8')
assert request_content == expected_request_content
return response_content
with httmock.HTTMock(Mocker().mock):
yield None
@pytest.fixture
def target_url(request):
return request.config.getoption('--target-url') or 'https://target.url/'

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

95
tests/test_saga.py Normal file
View File

@ -0,0 +1,95 @@
# coding: utf-8
#
# eopayment - online payment library
# Copyright (C) 2011-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function, unicode_literals
import pytest
import zeep.transports
import eopayment
@pytest.fixture
def saga(record_http_session):
if record_http_session:
from eopayment import saga
saga._zeep_transport = zeep.transports.Transport(session=record_http_session)
try:
yield None
finally:
saga._zeep_transport = None
else:
yield
@pytest.fixture
def backend_factory(saga, target_url):
def factory(**kwargs):
return eopayment.Payment('saga', dict({
'base_url': target_url,
'num_service': '868',
'compte': '70688',
'automatic_return_url': 'https://automatic.notif.url/automatic/',
'normal_return_url': 'https://normal.notif.url/normal/',
}, **kwargs))
return factory
def test_error_parametrage(backend_factory):
payment = backend_factory(num_service='1', compte='1')
with pytest.raises(eopayment.PaymentException, match='Impossible de déterminer le paramétrage'):
transaction_id, kind, url = payment.request(
amount='10.00',
email='john.doe@example.com',
subject='Réservation concert XYZ numéro 1234')
def test_request(backend_factory):
transaction_id, kind, url = backend_factory().request(
amount='10.00',
email='john.doe@example.com',
subject='Réservation concert XYZ numéro 1234')
assert transaction_id == '347b2060-1a37-11eb-af92-0213ad91a103'
assert kind == eopayment.URL
assert url == 'https://www.tipi.budget.gouv.fr/tpa/paiementws.web?idop=347b2060-1a37-11eb-af92-0213ad91a103'
def test_response(backend_factory):
response = backend_factory().response('idop=28b52f40-1ace-11eb-8ce3-0213ad91a104', redirect=False)
assert response.__dict__ == {
'bank_data': {
'email': 'john.doe@entrouvert.com',
'etat': 'paye',
'id_tiers': '-1',
'montant': '10.00',
'num_service': '868',
'numcp': '70688',
'numcpt_lib_ecriture': 'COUCOU'
},
'bank_status': 'paid',
'order_id': '28b52f40-1ace-11eb-8ce3-0213ad91a104',
'result': 3,
'return_content': None,
'signed': True,
'test': False,
'transaction_date': None,
'transaction_id': '28b52f40-1ace-11eb-8ce3-0213ad91a104',
}