combo/tests/test_lingo_payment.py

2362 lines
99 KiB
Python

import json
import re
import urllib.parse
import uuid
from contextlib import contextmanager
from datetime import datetime, timedelta
from decimal import Decimal
from unittest import mock
import eopayment
import httmock
import pytest
from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import User
from django.core.management import CommandError, call_command
from django.http.request import QueryDict
from django.test import override_settings
from django.urls import reverse
from django.utils import timezone
from django.utils.timezone import now, utc
from mellon.models import UserSAMLIdentifier
from requests.exceptions import ConnectionError
from combo.apps.lingo.models import (
EXPIRED,
BasketItem,
LingoBasketCell,
LingoException,
LingoRecentTransactionsCell,
PaymentBackend,
Regie,
Transaction,
TransactionOperation,
)
from combo.apps.lingo.views import signing_dumps, signing_loads
from combo.data.models import Page
from combo.utils import sign_url
from .test_manager import login
pytestmark = pytest.mark.django_db
def get_url(with_payment_backend, view_name, regie):
if with_payment_backend:
return reverse(
view_name + '-payment-backend', kwargs={'payment_backend_pk': regie.payment_backend.id}
)
return reverse(view_name, kwargs={'regie_pk': regie.id})
@contextmanager
def check_log(caplog, message):
idx = len(caplog.records)
yield
assert any(re.search(message, record.message) for record in caplog.records[idx:]), (
'%r not found in log records' % message
)
@pytest.fixture
def payment_backend():
return PaymentBackend.objects.create(
label='test1', slug='test1', service='dummy', service_options={'siret': '1234'}
)
@pytest.fixture
def regie(payment_backend):
regie = Regie()
regie.label = 'Test'
regie.slug = 'test'
regie.description = 'test'
regie.can_pay_only_one_basket_item = False
regie.payment_min_amount = Decimal(4.5)
regie.payment_backend = payment_backend
regie.save()
return regie
@pytest.fixture
def remote_regie(payment_backend):
regie = Regie()
regie.label = 'Remote'
regie.slug = 'remote'
regie.description = 'remote'
regie.payment_min_amount = Decimal(2.0)
regie.payment_backend = payment_backend
regie.webservice_url = 'http://example.org/regie' # is_remote
regie.save()
return regie
@pytest.fixture
def basket_page():
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
page.save()
cell = LingoBasketCell(page=page, placeholder='content', order=0)
cell.save()
return page
@pytest.fixture
def user():
try:
user = User.objects.get(username='admin')
except User.DoesNotExist:
user = User.objects.create_user('admin', password='admin', email='foo@example.com')
return user
@pytest.fixture
def user_name_id(user):
name_id = '7d6a86ae70f746f4887f22bad212f836'
UserSAMLIdentifier.objects.create(user=user, name_id=name_id)
return name_id
@pytest.fixture(params=['orig', 'sign_key'])
def key(request, settings):
if request.param == 'orig':
key = 'abcde'
settings.KNOWN_SERVICES = {
'wcs': {
'wcs1': {
'url': 'http://example.org/',
'orig': 'http://combo.example.net/',
'verif_orig': 'wcs',
'secret': key,
},
}
}
return key
else:
return settings.LINGO_API_SIGN_KEY
def assert_payment_status(url, transaction_id=None):
if hasattr(url, 'path'):
url = url.path
if transaction_id:
url, part = url.split('?')
query = urllib.parse.parse_qs(part)
assert 'transaction-id' in query
assert ':' not in query['transaction-id']
assert signing_loads(query['transaction-id'][0]) == transaction_id
assert url.startswith('/lingo/payment-status')
def test_default_regie():
payment_backend = PaymentBackend.objects.create(label='foo', slug='foo')
Regie.objects.all().delete()
regie1 = Regie(label='foo', slug='foo', payment_backend=payment_backend)
regie.can_pay_only_one_basket_item = False
regie1.save()
assert bool(regie1.is_default) is True
regie2 = Regie(label='bar', slug='bar', payment_backend=payment_backend)
regie2.save()
assert bool(regie2.is_default) is False
regie2.is_default = True
regie2.save()
regie2 = Regie.objects.get(id=regie2.id)
assert bool(regie2.is_default) is True
regie1 = Regie.objects.get(id=regie1.id)
assert bool(regie1.is_default) is False
def test_regie_api(app):
resp = app.get(reverse('api-regies'))
assert len(json.loads(resp.text).get('data')) == 0
test_default_regie()
resp = app.get(reverse('api-regies'))
assert len(json.loads(resp.text).get('data')) == 2
assert json.loads(resp.text).get('data')[0]['id'] == Regie.objects.get(is_default=True).slug
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
def test_payment_min_amount(mock_trigger_request, app, basket_page, regie, user):
mock_response = mock.Mock()
mock_response.status_code = 403
mock_trigger_request.return_value = mock_response
items = {
'item1': {'amount': '1.5', 'source_url': '/item/1'},
'item2': {'amount': '2.4', 'source_url': '/item/2'},
}
b_items = []
for subject, details in items.items():
b_item = BasketItem.objects.create(user=user, regie=regie, subject=subject, **details)
b_items.append(b_item.pk)
resp = login(app).get('/test_basket_cell/')
resp = resp.form.submit()
assert resp.status_code == 302
assert mock_trigger_request.call_count == 0
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
def test_transaction_manual_validation(mock_trigger_request, app, basket_page, user, monkeypatch):
mock_response = mock.Mock()
mock_response.status_code = 403
mock_trigger_request.return_value = mock_response
pb = PaymentBackend.objects.create(
label='test1',
slug='test1',
service='payzen',
service_options={'vads_site_id': '12345678', 'secret_test': 'plkoBdfcx987dhft6'},
)
regie = Regie.objects.create(
label='Test',
slug='test',
description='test',
payment_backend=pb,
transaction_options={'manual_validation': True},
can_pay_only_one_basket_item=False,
)
BasketItem.objects.create(
user=user, regie=regie, subject='item1', amount='1.5', source_url='http://example.org/item/1/'
)
with mock.patch('eopayment.Payment') as MockPayment:
MockPayment.return_value.request.return_value = (9876, 3, {})
resp = login(app).get('/test_basket_cell/')
resp = resp.form.submit()
assert MockPayment.return_value.request.call_args[1]['manual_validation'] is True
assert mock_trigger_request.call_count == 1
assert mock_trigger_request.call_args[0][0] == 'GET'
assert mock_trigger_request.call_args[0][1].startswith('http://example.org/item/1/jump/trigger/paid')
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
@pytest.mark.parametrize('with_payment_backend', [False, True])
def test_successfull_items_payment(mock_trigger_request, app, basket_page, regie, user, with_payment_backend):
mock_response = mock.Mock()
mock_response.status_code = 403
mock_trigger_request.return_value = mock_response
items = {
'item1': {'amount': '10.5', 'source_url': 'http://example.org/item/1/'},
'item2': {'amount': '42', 'source_url': 'http://example.org/item/2/'},
'item3': {'amount': '100', 'source_url': 'http://example.org/item/3/'},
'item4': {'amount': '354', 'source_url': 'http://example.org/item/4/'},
}
b_items = []
for subject, details in items.items():
b_item = BasketItem.objects.create(user=user, regie=regie, subject=subject, **details)
b_items.append(b_item.pk)
resp = login(app).get('/test_basket_cell/')
resp = resp.form.submit()
assert mock_trigger_request.call_count == 4 # source_url/jump/trigger/paid items had been verified
assert resp.status_code == 302
location = resp.location
assert 'dummy-payment' in location
parsed = urllib.parse.urlparse(location)
# get return_url and transaction id from location
qs = urllib.parse.parse_qs(parsed.query)
args = {'transaction_id': qs['transaction_id'][0], 'signed': True, 'ok': True, 'reason': 'Paid'}
# make sure return url is the user return URL
assert urllib.parse.urlparse(qs['return_url'][0]).path.startswith(
'/lingo/return-payment-backend/%s/' % regie.payment_backend.id
)
# simulate successful call to callback URL
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request'):
resp = app.get(get_url(with_payment_backend, 'lingo-callback', regie), params=args)
assert resp.status_code == 200
# simulate successful return URL
resp = app.get(qs['return_url'][0], params=args)
# redirect to payment status
assert resp.status_code == 302
assert urllib.parse.urlparse(resp.url).path.startswith('/lingo/payment-status')
resp = resp.follow()
assert 'Your payment has been succesfully registered.' in resp.text
assert urllib.parse.urlparse(resp.html.find('a', {'id': 'next-url'})['href']).path == '/test_basket_cell/'
def test_add_amount_to_basket(app, key, regie, user_name_id):
payment_backend = PaymentBackend.objects.create(
label='test2', slug='test2', service='dummy', service_options={'siret': '1234'}
)
other_regie = Regie(
label='test2', slug='test2', payment_backend=payment_backend, can_pay_only_one_basket_item=False
)
other_regie.save()
data = {'display_name': 'test amount', 'url': 'http://example.com/form/42/'}
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
resp = app.post_json(url, params=data, status=400)
assert 'missing amount parameter' in resp.text
amount = 42
data['amount'] = amount
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), 'unknown_id')
url = sign_url(url, key)
resp = app.post_json(url, params=data, status=400)
assert 'unknown user' in resp.text
amount = 42
data['amount'] = amount
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), 'unknown_id')
url = sign_url(url, key)
resp = app.post(url, params='', status=400)
assert 'bad json' in resp.text
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=amount).exists()
assert BasketItem.objects.filter(amount=amount)[0].regie_id == regie.id
resp = app.post_json('%s&amount=10' % url, params=data, status=403) # bad signature
data['extra'] = {'amount': '22.22'}
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('64.22')).exists()
data['amount'] = [amount]
data['extra'] = {'amount': ['22.22', '12']}
url = '%s?NameId=%s&orig=wcs&amount=5' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('81.22')).exists()
# accept french notation if settings.LANGUAGE_CODE is 'fr-*'
url = '%s?amount=10,00&NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'invalid value for "amount" in query string'
data['amount'] = '1,10'
url = '%s?amount=10.00&NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'invalid value for "amount" in payload'
data['amount'] = '1.10'
data['extra'] = {'amount': '0,01'}
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'invalid value for "amount" in extra payload'
data['amount'] = '1,10'
data['extra'] = {'amount': '0,01'}
url = '%s?amount=10,00&NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
with override_settings(LANGUAGE_CODE='fr-be'):
resp = app.post_json(url, params=data, status=200)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('11.11')).exists()
other_regie.is_default = True
other_regie.save()
data['amount'] = []
data['extra'] = {'amount': '22.23'}
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = app.post_json(url, params=data)
assert resp.status_code == 200
response = json.loads(resp.text)
assert response['result'] == 'success'
payment_url = urllib.parse.urlparse(response['payment_url'])
assert payment_url.path.startswith('/lingo/item/')
assert payment_url.path.endswith('/pay')
assert BasketItem.objects.filter(amount=Decimal('22.23')).exists()
assert BasketItem.objects.filter(amount=Decimal('22.23'))[0].regie_id == other_regie.id
url = '%s?NameId=%s&regie_id=%s' % (reverse('api-add-basket-item'), user_name_id, regie.id)
data['extra'] = {'amount': '22.24', 'foo': 'bar'}
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('22.24')).exists()
assert BasketItem.objects.filter(amount=Decimal('22.24'))[0].regie_id == regie.id
assert BasketItem.objects.filter(amount=Decimal('22.24'))[0].request_data == data['extra']
url = '%s?NameId=%s&regie_id=%s' % (reverse('api-add-basket-item'), user_name_id, regie.slug)
data['extra'] = {'amount': '13.67'}
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('13.67')).exists()
assert BasketItem.objects.filter(amount=Decimal('13.67'))[0].regie_id == regie.id
url = '%s?NameId=%s&orig=wcs&regie_id=%s' % (reverse('api-add-basket-item'), user_name_id, 'scarecrow')
url = sign_url(url, key)
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'unknown regie'
def test_basket_item_with_capture_date(app, user, user_name_id, regie, basket_page, monkeypatch):
url = '%s?NameId=%s' % (reverse('api-add-basket-item'), user_name_id)
capture_date = timezone.now().date()
data = {'amount': 10, 'capture_date': capture_date.isoformat(), 'display_name': 'test item'}
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert BasketItem.objects.all()[0].capture_date == capture_date
resp = login(app).get('/test_basket_cell/')
eopayment_mock = mock.Mock(
return_value=('orderid', eopayment.URL, 'http://dummy-payment.demo.entrouvert.com/')
)
monkeypatch.setattr(eopayment.Payment, 'request', eopayment_mock)
resp = resp.form.submit()
assert resp.status_code == 302
location = urllib.parse.urlparse(resp.location)
assert location.path == '/'
assert location.hostname == 'dummy-payment.demo.entrouvert.com'
items_info = [{'text': 'test item', 'amount': Decimal('10.00'), 'reference_id': ''}]
eopayment_mock.assert_called_once_with(
Decimal(10),
email=user.email,
first_name=user.first_name,
last_name=user.last_name,
capture_date=capture_date,
merchant_name='Compte Citoyen',
items_info=items_info,
)
def test_basket_items_extra_info(app, user, user_name_id, regie, basket_page, monkeypatch):
url = '%s?NameId=%s' % (reverse('api-add-basket-item'), user_name_id)
items_info = [
{'text': 'face mask', 'amount': Decimal(10), 'reference_id': '1'},
{'text': 'face mask 2', 'amount': Decimal(15), 'reference_id': '2'},
]
items_post_data = [
{'display_name': 'face mask', 'amount': 10, 'reference_id': 1},
{'display_name': 'face mask 2', 'amount': 15, 'reference_id': 2},
]
global_title = 'FooBar'
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
for item in items_post_data:
resp = app.post_json(url, params=item)
assert resp.status_code == 200
resp = login(app).get('/test_basket_cell/')
eopayment_mock = mock.Mock(
return_value=('orderid', eopayment.URL, 'http://dummy-payment.demo.entrouvert.com/')
)
monkeypatch.setattr(eopayment.Payment, 'request', eopayment_mock)
with override_settings(TEMPLATE_VARS={'global_title': global_title}):
resp = resp.form.submit()
eopayment_mock.assert_called_once_with(
Decimal(25),
email=user.email,
first_name=user.first_name,
last_name=user.last_name,
merchant_name=global_title,
items_info=items_info,
)
def test_basket_items_extra_info_no_basket(app, regie, basket_page, monkeypatch):
# when no user is authenticated, it is still possible to pass an email to eopayment backend
url = reverse('api-add-basket-item')
email = 'test@entrouvert.com'
items_info = [{'text': 'face mask', 'amount': Decimal(10), 'reference_id': ''}]
item = {'display_name': 'face mask', 'amount': 10, 'email': email}
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = app.post_json(url, params=item)
assert resp.status_code == 200
item = BasketItem.objects.first()
assert item.user is None
payment_url = resp.json['payment_url']
eopayment_mock = mock.Mock(
return_value=('orderid', eopayment.URL, 'http://dummy-payment.demo.entrouvert.com/')
)
monkeypatch.setattr(eopayment.Payment, 'request', eopayment_mock)
resp = app.get(payment_url)
eopayment_mock.assert_called_once_with(
Decimal(10),
email=email,
first_name='',
last_name='',
items_info=items_info,
merchant_name='Compte Citoyen',
)
@pytest.mark.parametrize("invalid_capture_date", [8, '', 'not-a-date'])
def test_add_basket_capture_date_format(app, user_name_id, regie, invalid_capture_date):
url = '%s?NameId=%s' % (reverse('api-add-basket-item'), user_name_id)
data = {'amount': 10, 'display_name': 'test item'}
data['capture_date'] = invalid_capture_date
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'bad format for capture date, it should be yyyy-mm-dd'
def test_add_basket_item_with_remote_regie(app, user_name_id, remote_regie):
data = {'amount': 10, 'display_name': 'test item'}
url = '%s?NameId=%s' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'can not add a basket item to a remote regie'
def test_add_basket_item_without_display_name(app, user_name_id, regie):
data = {'amount': '42', 'url': 'http://example.com/form/id/'}
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = app.post_json(url, params=data, status=400)
assert 'missing display_name parameter' in resp.text
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
def test_cant_pay_if_different_capture_date(mock_trigger_request, app, basket_page, regie, user):
capture1 = (timezone.now() + timedelta(days=1)).date()
capture2 = (timezone.now() + timedelta(days=2)).date()
items = {
'item1': {
'amount': '10.5',
'capture_date': capture1.isoformat(),
'source_url': 'http://example.org/item/1/',
},
'item2': {
'amount': '42',
'capture_date': capture2.isoformat(),
'source_url': 'http://example.org/item/2/',
},
}
b_items = []
for subject, details in items.items():
b_item = BasketItem.objects.create(user=user, regie=regie, subject=subject, **details)
b_items.append(b_item.pk)
resp = login(app).get('/test_basket_cell/')
resp = resp.form.submit()
assert resp.status_code == 302
assert urllib.parse.urlparse(resp.location).path == '/test_basket_cell/'
resp = resp.follow()
assert "Invalid grouping for basket items: different capture dates." in resp.text
assert mock_trigger_request.call_count == 0
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
def test_pay_single_basket_item(mock_trigger_request, app, key, regie, user_name_id, john_doe):
page = Page(title='xxx', slug='index', template_name='standard')
page.save()
cell = LingoBasketCell(page=page, placeholder='content', order=0)
cell.save()
amount = 12
data = {'amount': amount, 'display_name': 'test amount', 'url': 'http://example.com/form/42/'}
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
resp = app.post_json(url, params=data)
# check that an unpaid item exists in basket
assert BasketItem.objects.filter(regie=regie, amount=amount, payment_date__isnull=True).exists()
payment_data = resp.json
payment_url = payment_data['payment_url']
# check that payment is possible unconnected
app.get(payment_url, status=302)
assert mock_trigger_request.call_count == 1
# and connected with another user (john.doe != admin)
login(app, username='john.doe', password='john.doe')
app.get(payment_url, status=302)
assert mock_trigger_request.call_count == 2
# forbid payment to regie with extra_fees_ws_url
regie.extra_fees_ws_url = 'http://example.com/extra-fees'
regie.save()
app.reset()
login(app)
resp = app.get(payment_url, status=403)
assert 'No item payment allowed as extra fees set.' in resp.text
regie.extra_fees_ws_url = ''
regie.save()
# forbid payment if payment trigger is impossible (404)
app.reset()
login(app)
mock_trigger_request.reset_mock()
mock_response = mock.Mock()
mock_response.status_code = 404
mock_trigger_request.return_value = mock_response
resp = app.get(payment_url, status=302)
assert mock_trigger_request.call_count == 1
assert mock_trigger_request.call_args[0][0] == 'GET'
assert mock_trigger_request.call_args[0][1].startswith('http://example.com/form/42/jump/trigger/paid')
assert 'At least one item is not linked to a payable form.' in app.session['_messages']
# now launch a good payment
mock_trigger_request.reset_mock()
mock_response = mock.Mock()
mock_response.status_code = 403
mock_trigger_request.return_value = mock_response
assert not Transaction.objects.filter(user__username='admin', status=0).exists()
resp = app.get(payment_url, params={'next_url': 'http://example.net/form/id/'})
assert mock_trigger_request.call_count == 1
assert mock_trigger_request.call_args[0][0] == 'GET'
assert mock_trigger_request.call_args[0][1].startswith('http://example.com/form/42/jump/trigger/paid')
assert Transaction.objects.filter(user__username='admin', status=0).exists()
# make sure the redirection is done to the payment backend
assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/')
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['amount'] == ['12.00']
# simulate successful payment response from dummy backend
data = {'transaction_id': qs['transaction_id'][0], 'ok': True, 'amount': qs['amount'][0], 'signed': True}
# simulate payment service redirecting the user to /lingo/return/... (eopayment
# dummy module put that URL in return_url query string parameter).
resp = app.get(qs['return_url'][0], params=data)
# check that item is paid
item = BasketItem.objects.filter(regie=regie, amount=amount, payment_date__isnull=False).first()
# check that user is redirected to the item payment status view
assert_payment_status(resp.location, transaction_id=item.transaction_set.last().pk)
assert Transaction.objects.filter(user__username='admin', status=eopayment.PAID).exists()
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
def test_pay_single_basket_item_another_user(mock_trigger_request, app, key, regie, user_name_id, john_doe):
page = Page(title='xxx', slug='index', template_name='standard')
page.save()
cell = LingoBasketCell(page=page, placeholder='content', order=0)
cell.save()
amount = 12
data = {'amount': amount, 'display_name': 'test amount', 'url': 'http://example.com/form/id/'}
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
resp = app.post_json(url, params=data)
# check that an unpaid item exists in basket
assert BasketItem.objects.filter(regie=regie, amount=amount, payment_date__isnull=True).exists()
payment_url = resp.json['payment_url']
# and connected with another user (john.doe != admin)
login(app, username='john.doe', password='john.doe')
assert not Transaction.objects.filter(user__username='john.doe', status=0).exists()
resp = app.get(payment_url, params={'next_url': 'http://example.net/form/id/'})
assert mock_trigger_request.call_count == 1
assert Transaction.objects.filter(user__username='john.doe', status=0).exists()
# make sure the redirection is done to the payment backend
assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/')
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['amount'] == ['12.00']
# simulate successful payment response from dummy backend
data = {'transaction_id': qs['transaction_id'][0], 'ok': True, 'amount': qs['amount'][0], 'signed': True}
# simulate payment service redirecting the user to /lingo/return/... (eopayment
# dummy module put that URL in return_url query string parameter).
resp = app.get(qs['return_url'][0], params=data)
# check that item is paid
item = BasketItem.objects.filter(regie=regie, amount=amount, payment_date__isnull=False).first()
# check that user is redirected to the item payment status view
assert_payment_status(resp.location, transaction_id=item.transaction_set.last().pk)
assert Transaction.objects.filter(user__username='john.doe', status=eopayment.PAID).exists()
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
def test_pay_multiple_regies(mocked_trigger_request, app, key, regie, user_name_id):
test_add_amount_to_basket(app, key, regie, user_name_id)
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
page.save()
cell = LingoBasketCell(page=page, placeholder='content', order=0)
cell.save()
resp = login(app).get(page.get_online_url())
assert mocked_trigger_request.call_count == 0
resp = resp.forms[0].submit()
assert mocked_trigger_request.call_count == 6 # trigger/paid verification, first regie
assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/')
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['amount'] == ['234.46']
resp = login(app).get(page.get_online_url())
mocked_trigger_request.reset_mock()
resp = resp.forms[1].submit()
assert mocked_trigger_request.call_count == 1 # trigger/paid verification, other regie
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['amount'] == ['22.23']
def test_pay_as_anonymous_user(app, key, regie, user_name_id):
test_add_amount_to_basket(app, key, regie, user_name_id)
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
page.save()
cell = LingoBasketCell(page=page, placeholder='content', order=0)
cell.save()
resp = login(app).get(page.get_online_url())
app.cookiejar.clear(domain='testserver.local', path='/', name='sessionid')
resp = resp.forms[0].submit().follow()
assert 'Payment requires to be logged in.' in resp.text
def test_cancel_basket_item(app, key, regie, user_name_id):
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
data = {
'amount': 42,
'display_name': 'test amount',
'url': 'http://example.com/form/id/',
'notify': 'true',
}
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
basket_item_id = json.loads(resp.text)['id']
data = {'amount': 21, 'display_name': 'test amount', 'url': 'http://example.net/'}
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
basket_item_id_2 = json.loads(resp.text)['id']
url = '%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_name_id)
url = sign_url(url, key)
data = {'notify': 'true'}
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'missing basket_item_id parameter'
url = '%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_name_id)
url = sign_url(url, key)
data = {'notify': 'true'}
resp = app.post(url, params='', status=400)
assert 'bad json' in resp.json['err_desc']
url = '%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_name_id)
url = sign_url(url, key)
data = {'basket_item_id': 'eggs', 'notify': 'true'}
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'invalid basket_item_id'
url = '%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_name_id)
url = sign_url(url, key)
data = {'basket_item_id': 0, 'notify': 'true'}
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'unknown basket item'
url = '%s?orig=wcs' % (reverse('api-remove-basket-item'))
url = sign_url(url, key)
data = {'basket_item_id': basket_item_id, 'notify': 'true'}
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'no user specified'
url = '%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), 'unknown@example.com')
url = sign_url(url, key)
data = {'basket_item_id': basket_item_id, 'notify': 'true'}
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'unknown user'
other_user, _ = User.objects.get_or_create(email='hop@example.com')
other_user_name_id = uuid.uuid4()
UserSAMLIdentifier.objects.get_or_create(user=other_user, name_id=other_user_name_id)
url = '%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), other_user_name_id)
url = sign_url(url, key)
data = {'basket_item_id': basket_item_id, 'notify': 'true'}
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'user does not own the basket item'
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
url = '%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_name_id)
url = sign_url(url, key)
data = {'basket_item_id': basket_item_id, 'notify': 'true'}
resp = app.post_json(url, params=data)
assert request.call_args[0] == ('POST', 'http://example.com/form/id/jump/trigger/cancelled')
assert not BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
url = '%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_name_id)
url = sign_url(url, key)
data = {'basket_item_id': basket_item_id_2}
resp = app.post_json(url, params=data)
assert request.call_count == 0
assert not BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
assert not BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
url = '%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_name_id)
url = sign_url(url, key)
data = {'basket_item_id': basket_item_id}
resp = app.post_json(url, params=data, status=400)
assert resp.json['err_desc'] == 'basket item already cancelled'
def test_cancel_basket_item_from_cell(app, key, regie, user_name_id):
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
page.save()
cell = LingoBasketCell(page=page, placeholder='content', order=0)
cell.save()
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'}
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
basket_item_id = json.loads(resp.text)['id']
# check while not logged in
resp = app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}), status=404)
assert BasketItem.objects.filter(id=basket_item_id).exists()
# check a successful case
app = login(app)
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
resp = app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}))
resp = resp.form.submit()
url = request.call_args[0][1]
assert url.startswith('http://example.org/testitem/jump/trigger/cancelled')
assert BasketItem.objects.filter(id=basket_item_id, cancellation_date__isnull=False).exists()
# check removal of an item that is not cancellable
url = '%s?NameId=%s&cancellable=no&orig=wcs' % (reverse('api-add-basket-item'), user_name_id)
url = sign_url(url, key)
data = {'amount': 21, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'}
resp = app.post_json(url, params=data)
basket_item2_id = json.loads(resp.text)['id']
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
resp = app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item2_id}))
resp = resp.form.submit()
resp = resp.follow()
assert 'This item cannot be removed.' in resp.text
# check removal of the item of another user
other_user, _ = User.objects.get_or_create(email='hop@example.com')
other_user_name_id = uuid.uuid4()
UserSAMLIdentifier.objects.get_or_create(user=other_user, name_id=other_user_name_id)
url = '%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), other_user_name_id)
url = sign_url(url, key)
data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'}
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
basket_item_id = json.loads(resp.text)['id']
app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}), status=404)
app.post(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}), status=403)
@pytest.mark.parametrize('with_payment_backend', [False, True])
def test_payment_callback(app, basket_page, regie, user, with_payment_backend):
page = Page(title='xxx', slug='index', template_name='standard')
page.save()
BasketItem.objects.create(
user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/test/id/'
)
resp = login(app).get(basket_page.get_online_url())
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = resp.form.submit()
assert trigger_request.call_count == 1
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '10.50'
# call callback with GET
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
get_resp = app.get(callback_url, params=data)
url = request.call_args[0][1]
assert url.startswith('http://example.org/test/id/jump/trigger/paid')
assert get_resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == 3
BasketItem.objects.create(
user=user, regie=regie, subject='test_item', amount='11.5', source_url='http://example.org/test/id/'
)
resp = login(app).get(basket_page.get_online_url())
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = resp.form.submit()
assert trigger_request.call_count == 1
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
return_url = qs['return_url'][0]
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '11.50'
# call callback with POST
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
post_resp = app.post(callback_url, params=data)
assert post_resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == 3
# call return view
get_resp = app.get(return_url, params=data)
assert get_resp.status_code == 302
resp = app.get(get_resp['Location'])
assert 'Your payment has been succesfully registered.' in resp.text
@pytest.mark.parametrize('with_payment_backend', [False, True])
def test_payment_callback_no_regie(app, basket_page, regie, user, with_payment_backend):
BasketItem.objects.create(
user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/test/1/'
)
resp = login(app).get(basket_page.get_online_url())
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = resp.form.submit()
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.org/test/1/jump/trigger/paid')
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '10.50'
# call callback with GET
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
get_resp = app.get(callback_url, params=data)
assert request.call_args[0][0] == 'POST'
url = request.call_args[0][1]
assert url.startswith('http://example.org/test/1/jump/trigger/paid')
assert get_resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == 3
BasketItem.objects.create(
user=user, regie=regie, subject='test_item', amount='11.5', source_url='http://example.org/test/2/'
)
resp = login(app).get(basket_page.get_online_url())
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = resp.form.submit()
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.org/test/2/jump/trigger/paid')
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '11.50'
@pytest.mark.parametrize('with_payment_backend', [False, True])
def test_payment_return_without_query_string(app, basket_page, regie, user, with_payment_backend):
page = Page(title='xxx', slug='index', template_name='standard')
page.save()
BasketItem.objects.create(
user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/test/3/'
)
resp = login(app).get(basket_page.get_online_url())
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = resp.form.submit()
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.org/test/3/jump/trigger/paid')
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
return_url = qs['return_url'][0]
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True}
# payment status is obtained through callback
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request'):
get_resp = app.get(callback_url, params=data)
transaction = Transaction.objects.get(order_id=transaction_id)
assert transaction.status == 3
# then return view is called without any data, which should be expected by the backend
if regie.eopayment.has_empty_response:
with mock.patch.object(eopayment.dummy.Payment, 'response', autospec=True) as mock_response:
mock_response.return_value = eopayment.common.PaymentResponse(
result=transaction.status, order_id=transaction.order_id
)
get_resp = app.get(return_url)
mock_response.assert_called_once_with(
mock.ANY,
'',
redirect=True,
order_id_hint=transaction.order_id,
order_status_hint=transaction.status,
)
else:
with mock.patch.object(eopayment.dummy.Payment, 'response', autospec=True, side_effect=Exception):
get_resp = app.get(return_url)
assert get_resp.status_code == 302
resp = app.get(get_resp['Location'])
if regie.eopayment.has_empty_response:
assert 'Your payment has been succesfully registered.' in resp.text
@pytest.mark.parametrize('with_payment_backend', [False, True])
def test_nonexisting_transaction(app, regie, user, with_payment_backend):
app = login(app)
data = {'transaction_id': 'unknown', 'signed': True, 'amount': '23', 'ok': True}
# call callback with GET
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
app.get(callback_url, params=data, status=400)
@pytest.mark.parametrize('with_payment_backend', [False, True])
def test_payment_callback_waiting(app, basket_page, regie, user, with_payment_backend):
item = BasketItem.objects.create(
user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/test/4/'
)
resp = login(app).get(basket_page.get_online_url())
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = resp.form.submit()
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.org/test/4/jump/trigger/paid')
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'waiting': True}
assert data['amount'] == '10.50'
# callback with WAITING state
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
resp = app.get(callback_url, params=data)
assert resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == eopayment.WAITING
assert BasketItem.objects.get(id=item.id).waiting_date
assert not BasketItem.objects.get(id=item.id).payment_date
assert BasketItem.get_items_to_be_paid(user).count() == 0
# callback with PAID state
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True}
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
resp = app.get(callback_url, params=data)
assert resp.status_code == 200
url = request.call_args[0][1]
assert url.startswith('http://example.org/test/4/jump/trigger/paid')
assert Transaction.objects.get(order_id=transaction_id).status == eopayment.PAID
assert BasketItem.objects.get(id=item.id).payment_date
assert BasketItem.get_items_to_be_paid(user).count() == 0
@pytest.mark.parametrize('with_payment_backend', [False, True])
def test_payment_no_callback_just_return(caplog, app, basket_page, regie, user, with_payment_backend):
BasketItem.objects.create(
user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/test/5/'
)
resp = login(app).get(basket_page.get_online_url())
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = resp.form.submit()
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.org/test/5/jump/trigger/paid')
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '10.50'
return_url = qs['return_url'][0]
# call return with unsigned POST
with check_log(caplog, 'Received unsigned payment response'):
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
get_resp = app.post(return_url, params=data)
assert request.call_count == 0
assert get_resp.status_code == 302
assert urllib.parse.urlparse(get_resp['location']).path == '/lingo/payment-status'
assert Transaction.objects.get(order_id=transaction_id).status == 0 # not paid
# call return with missing data
with check_log(caplog, 'missing transaction_id.*amount=10'):
baddata = data.copy()
del baddata['transaction_id']
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
get_resp = app.post(return_url, params=baddata)
assert get_resp.status_code == 302
resp = app.get(get_resp['Location'])
assert 'Your payment has been succesfully registered.' not in resp.text
assert 'the payment service failed to provide a correct answer.' in resp.text
assert Transaction.objects.get(order_id=transaction_id).status == 0 # not paid
# call return with signed POST
data['signed'] = True
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
get_resp = app.post(return_url, params=data)
url = request.call_args[0][1]
assert url.startswith('http://example.org/test/5/jump/trigger/paid')
# redirect to payment status
assert get_resp.status_code == 302
assert urllib.parse.urlparse(get_resp.url).path.startswith('/lingo/payment-status')
resp = get_resp.follow()
assert 'Your payment has been succesfully registered.' in resp.text
assert urllib.parse.urlparse(resp.html.find('a', {'id': 'next-url'})['href']).path == '/test_basket_cell/'
assert Transaction.objects.get(order_id=transaction_id).status == eopayment.PAID
def test_transaction_expiration():
t1 = Transaction(status=0)
t1.save()
t1.start_date = timezone.now() - timedelta(hours=2)
t1.save()
t2 = Transaction(status=0)
t2.save()
appconfig = apps.get_app_config('lingo')
appconfig.update_transactions()
assert Transaction.objects.get(id=t1.id).status == EXPIRED
assert Transaction.objects.get(id=t2.id).status == 0
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
def test_transaction_retry(mock_request, remote_regie):
transaction = Transaction.objects.create(
status=eopayment.PAID, regie=remote_regie, end_date=timezone.now(), to_be_paid_remote_items='42,35'
)
transaction.start_date = timezone.now() - timedelta(days=3)
transaction.save()
appconfig = apps.get_app_config('lingo')
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {
'err': 0,
'data': {
'created': '2020-11-23',
'pay_limit_date': '2021-11-23',
'total_amount': '123.45',
'amount': '123.45',
},
}
mock_request.return_value = mock_json
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items is None
# too old
transaction.to_be_paid_remote_items = '42,35'
transaction.start_date = timezone.now() - timedelta(days=4)
transaction.save()
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items == '42,35'
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
def test_transaction_retry_failure(mock_request, remote_regie):
transaction = Transaction.objects.create(
status=eopayment.PAID, regie=remote_regie, end_date=timezone.now(), to_be_paid_remote_items='42,35'
)
appconfig = apps.get_app_config('lingo')
mock_json_item = mock.Mock(status_code=200)
mock_json_item.json.return_value = {
'err': 0,
'data': {
'created': '2020-11-23',
'pay_limit_date': '2021-11-23',
'total_amount': '123.45',
'amount': '123.45',
},
}
mock_json_paid = mock.Mock(status_code=200)
mock_json_paid.json.return_value = {'err': 0}
mock_5xx = mock.Mock(status_code=500)
mock_4xx = mock.Mock(status_code=400)
mock_err = mock.Mock(status_code=200)
mock_err.json.return_url = {'err': 1}
assert transaction.items.count() == 0
# error on get invoice
mock_request.side_effect = [
ConnectionError('where is my hostname?'), # get invoice 42
mock_json_item, # get invoice 35
mock_json_paid, # pay invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.items.count() == 1 # only 35 was found
assert set(transaction.items.values_list('remote_item_id', flat=True)) == {'35'}
assert transaction.to_be_paid_remote_items == '42' # retry for first one
# error on pay invoice
Transaction.objects.update(to_be_paid_remote_items='42,35')
mock_request.side_effect = [
mock_json_item, # get invoice 42
ConnectionError('where is my hostname?'), # pay invoice 42
mock_json_item, # get invoice 35
mock_json_paid, # pay invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.items.count() == 2 # both were updated now that get_invoice worked for 42
assert set(transaction.items.values_list('remote_item_id', flat=True)) == {'35', '42'}
assert transaction.to_be_paid_remote_items == '42' # retry for first one
# unknown error on get invoice
Transaction.objects.update(to_be_paid_remote_items='42,35')
mock_request.side_effect = [
Exception, # get invoice 42
mock_json_item, # get invoice 35
mock_json_paid, # pay invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items == '42' # retry for first one
# unknown error on pay invoice
Transaction.objects.update(to_be_paid_remote_items='42,35')
mock_request.side_effect = [
mock_json_item, # get invoice 42
Exception, # pay invoice 42
mock_json_item, # get invoice 35
mock_json_paid, # pay invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items == '42' # retry for first one
# 5xx on get invoice
Transaction.objects.update(to_be_paid_remote_items='42,35')
mock_request.side_effect = [
mock_5xx, # get invoice 42
mock_json_item, # get invoice 35
mock_json_paid, # pay invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items == '42' # retry for first one
# 5xx on pay invoice
Transaction.objects.update(to_be_paid_remote_items='42,35')
mock_request.side_effect = [
mock_json_item, # get invoice 42
mock_5xx, # pay invoice 42
mock_json_item, # get invoice 35
mock_json_paid, # pay invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items == '42' # retry for first one
# err on get invoice
Transaction.objects.update(to_be_paid_remote_items='42,35')
mock_request.side_effect = [
mock_err, # get invoice 42
mock_json_item, # get invoice 35
mock_json_paid, # pay invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items == '42' # retry for first one
# err on pay invoice
Transaction.objects.update(to_be_paid_remote_items='42,35')
mock_request.side_effect = [
mock_json_item, # get invoice 42
mock_err, # pay invoice 42
mock_json_item, # get invoice 35
mock_json_paid, # pay invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items == '42' # retry for first one
# 4xx on get invoice
Transaction.objects.update(to_be_paid_remote_items='42,35')
mock_request.side_effect = [
mock_4xx, # get invoice 42
mock_5xx, # get invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items == '35' # retry for the second only
# 4xx on pay invoice
Transaction.objects.update(to_be_paid_remote_items='42,35')
mock_request.side_effect = [
mock_json_item, # get invoice 42
mock_4xx, # get invoice 35
mock_5xx, # pay invoice 35
]
appconfig.update_transactions()
transaction.refresh_from_db()
assert transaction.to_be_paid_remote_items == '35' # retry for the second only
def test_transaction_validate(app, key, regie, user):
t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12, status=eopayment.PAID)
t1.save()
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0'
resp = app.post_json(url, params={}, status=403)
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0&orig=wcs'
url = sign_url(url, key)
resp = app.post_json(url, params={}, status=404)
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
url = sign_url(url, key)
resp = app.post_json(url, params={})
assert json.loads(resp.text)['err'] == 0
operations = TransactionOperation.objects.filter(transaction=t1)
assert len(operations) == 1
assert operations[0].amount == 10
with mock.patch.object(eopayment.dummy.Payment, 'validate', autospec=True) as mock_validate:
mock_validate.side_effect = eopayment.ResponseError
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
url = sign_url(url, key)
resp = app.post_json(url, params={})
assert json.loads(resp.text)['err'] == 1
assert TransactionOperation.objects.filter(transaction=t1).count() == 1
def test_transaction_cancel(app, key, regie, user):
t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12, status=eopayment.PAID)
t1.save()
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs'
resp = app.post_json(url, params={}, status=403)
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs'
url = sign_url(url, key)
resp = app.post(url, params={}, status=404)
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
url = sign_url(url, key)
resp = app.post_json(url, params={})
assert json.loads(resp.text)['err'] == 0
operations = TransactionOperation.objects.filter(transaction=t1)
assert len(operations) == 1
assert operations[0].amount == 10
with mock.patch.object(eopayment.dummy.Payment, 'cancel', autospec=True) as mock_cancel:
mock_cancel.side_effect = eopayment.ResponseError
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
url = sign_url(url, key)
resp = app.post_json(url, params={})
assert json.loads(resp.text)['err'] == 1
assert TransactionOperation.objects.filter(transaction=t1).count() == 1
def test_extra_fees(app, basket_page, key, regie, user_name_id):
regie.extra_fees_ws_url = 'http://www.example.net/extra-fees'
regie.save()
amount = 42
data = {'amount': amount, 'display_name': 'test amount'}
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '5'}]}
request.return_value = mock_json
url = sign_url('%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id), key)
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert BasketItem.objects.filter(amount=amount).exists()
assert BasketItem.objects.filter(amount=amount)[0].regie_id == regie.id
assert BasketItem.objects.filter(amount=5, extra_fee=True).exists()
assert BasketItem.objects.filter(amount=5, extra_fee=True)[0].regie_id == regie.id
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '7'}]}
request.return_value = mock_json
data['amount'] = 43
url = sign_url('%s?NameId=%s&orig=wcs' % (reverse('api-add-basket-item'), user_name_id), key)
resp = app.post_json(url, params=data)
assert request.call_args[0] == ('POST', 'http://www.example.net/extra-fees')
assert len(json.loads(request.call_args[1]['data'])['data']) == 2
assert resp.status_code == 200
assert json.loads(resp.text)['result'] == 'success'
assert not BasketItem.objects.filter(amount=5, extra_fee=True).exists()
assert BasketItem.objects.filter(amount=7, extra_fee=True).exists()
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '4'}]}
request.return_value = mock_json
url = sign_url('%s?NameId=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_name_id), key)
data = {'basket_item_id': BasketItem.objects.get(amount=43).id}
resp = app.post_json(url, params=data)
assert resp.status_code == 200
assert not BasketItem.objects.filter(amount=7, extra_fee=True).exists()
assert BasketItem.objects.filter(amount=4, extra_fee=True).exists()
# test payment
app = login(app)
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '2'}]}
request.return_value = mock_json
resp = login(app).get(basket_page.get_online_url())
resp = resp.form.submit()
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '44.00'
# test again, without specifying a regie
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '3'}]}
request.return_value = mock_json
resp = login(app).get(basket_page.get_online_url())
resp = resp.form.submit()
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '45.00'
# call callback with GET
callback_url = reverse('lingo-callback', kwargs={'regie_pk': regie.id})
resp = app.get(callback_url, params=data)
assert resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == 3
@pytest.mark.parametrize('with_payment_backend', [False, True])
def test_payment_callback_error(app, basket_page, regie, user, with_payment_backend):
item = BasketItem.objects.create(
user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/test/6/'
)
resp = login(app).get(basket_page.get_online_url())
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = resp.form.submit()
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.org/test/6/jump/trigger/paid')
assert resp.status_code == 302
location = resp.location
parsed = urllib.parse.urlparse(location)
qs = urllib.parse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '10.50'
# call callback with GET
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
mock_response = mock.Mock()
def kaboom():
raise Exception('kaboom')
mock_response.status_code = 500
mock_response.raise_for_status = kaboom
request.return_value = mock_response
get_resp = app.get(callback_url, params=data)
url = request.call_args[0][1]
assert url.startswith('http://example.org/test/6/jump/trigger/paid')
assert get_resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == 3
assert BasketItem.objects.get(id=item.id).payment_date
assert not BasketItem.objects.get(id=item.id).notification_date
# too soon
appconfig = apps.get_app_config('lingo')
appconfig.notify_payments()
assert BasketItem.objects.get(id=item.id).payment_date
assert not BasketItem.objects.get(id=item.id).notification_date
# fake delay
basket_item = BasketItem.objects.get(id=item.id)
basket_item.payment_date = timezone.now() - timedelta(hours=1)
basket_item.save()
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
mock_response = mock.Mock()
mock_response.status_code = 200
request.return_value = mock_response
appconfig.notify_payments()
url = request.call_args[0][1]
assert url.startswith('http://example.org/test/6/jump/trigger/paid')
assert BasketItem.objects.get(id=item.id).payment_date
assert BasketItem.objects.get(id=item.id).notification_date
def test_payment_callback_not_found(app, user, regie):
data = {'transaction_id': 42, 'signed': True, 'amount': 42, 'ok': True}
callback_url = reverse('lingo-callback', kwargs={'regie_pk': 0})
app.get(callback_url, params=data, status=404)
callback_url = reverse('lingo-callback-payment-backend', kwargs={'payment_backend_pk': 0})
app.get(callback_url, params=data, status=404)
@pytest.mark.parametrize("authenticated", [True, False])
def test_payment_no_basket(app, user_name_id, regie, authenticated):
url = reverse('api-add-basket-item')
source_url = 'http://example.org/item/1/'
data = {'amount': 10, 'display_name': 'test item', 'url': source_url}
if authenticated:
data['NameId'] = user_name_id
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = app.post_json(url, params=data)
assert resp.status_code == 200
payment_url = resp.json['payment_url']
item = BasketItem.objects.first()
assert item.user is None
assert item.amount == Decimal('10.00')
path = urllib.parse.urlparse(payment_url).path
start = '/lingo/item/'
end = '/pay'
assert path.startswith(start)
assert path.endswith(end)
signature = path.replace(start, '').replace(end, '')
assert signing_loads(signature) == item.id
if authenticated:
app = login(app)
# payment error due to too small amount
item.amount = Decimal('1.00')
item.save()
resp = app.get(payment_url)
assert_payment_status(resp.location)
resp = resp.follow()
assert 'Minimal payment amount is 4.50' in resp.text
# we can go back to form
assert source_url in resp.text
# amount ok, redirection to payment backend
item.amount = Decimal('10.00')
item.save()
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = app.get(payment_url)
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.org/item/1/jump/trigger/paid')
assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/')
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['amount'] == ['10.00']
if authenticated:
assert qs['email'] == ['foo@example.com']
else:
assert 'email' not in qs
# mail can be specified here for anonymous user
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = app.get(
payment_url,
params={
'email': 'foo@localhost',
},
)
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.org/item/1/jump/trigger/paid')
assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/')
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['amount'] == ['10.00']
if authenticated:
assert qs['email'] == ['foo@example.com']
else:
assert qs['email'] == ['foo@localhost']
# simulate bad responseform payment backend, no transaction id
data = {'amount': qs['amount'][0], 'signed': True}
return_url = qs['return_url'][0]
resp = app.get(return_url, params=data)
assert_payment_status(resp.location)
resp = resp.follow()
assert 'We are sorry but the payment service failed to provide a correct answer.' in resp.text
assert 'http://example.org/item/1' in resp.text
# check that item is not paid
item = BasketItem.objects.get(pk=item.pk)
assert not item.payment_date
# simulate successful payment response from dummy backend
data = {'transaction_id': qs['transaction_id'][0], 'ok': True, 'amount': qs['amount'][0], 'signed': True}
return_url = qs['return_url'][0]
resp = app.get(return_url, params=data)
assert_payment_status(resp.location, transaction_id=item.transaction_set.last().pk)
# check that item is paid
item = BasketItem.objects.get(pk=item.pk)
assert item.payment_date
# accept redirection to item payment status view
resp = resp.follow()
# which should it self redirect to item.source_url as it is paid
assert 'Please wait while your request is being processed' in resp.text
assert source_url in resp.text
def test_transaction_status_api(app, regie, user):
# invalid transaction signature
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps('xxxx')})
resp = app.get(url, status=404)
assert 'Unknown transaction.' in resp.text
# unkown transaction identifier
transaction_id = 1000
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps(transaction_id)})
resp = app.get(url, status=404)
assert 'Unknown transaction.' in resp.text
wait_response = {'wait': True, 'error': False, 'error_msg': ''}
# anonymous user on anonymous transaction: OK
transaction = Transaction.objects.create(amount=Decimal('10.0'), regie=regie, status=0)
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps(transaction.pk)})
resp = app.get(url)
assert resp.json == wait_response
# authenticated user on anonymous transaction: OK
transaction = Transaction.objects.create(amount=Decimal('10.0'), regie=regie, status=0)
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps(transaction.pk)})
resp = login(app).get(url)
assert resp.json == wait_response
app.reset()
# authenticated user on his transaction: OK
transaction = Transaction.objects.create(amount=Decimal('10.0'), regie=regie, status=0, user=user)
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps(transaction.pk)})
resp = login(app).get(url)
assert resp.json == wait_response
app.reset()
error_msg = 'Transaction does not belong to the requesting user'
# anonymous user on other user transaction transaction: NOTOK
transaction = Transaction.objects.create(amount=Decimal('10.0'), regie=regie, status=0, user=user)
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps(transaction.pk)})
resp = app.get(url, status=403)
assert error_msg in resp.text
# authenticated user on other user transaction transaction: NOTOK
user2 = User.objects.create_user('user2', password='user2', email='user2@example.com')
transaction = Transaction.objects.create(amount=Decimal('10.0'), regie=regie, status=0, user=user2)
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps(transaction.pk)})
resp = login(app).get(url, status=403)
assert error_msg in resp.text
app.reset()
# transaction error
transaction = Transaction.objects.create(amount=Decimal('10.0'), regie=regie, status=eopayment.ERROR)
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps(transaction.pk)})
resp = app.get(url)
assert resp.json == {
'wait': True,
'error': True,
'error_msg': 'Payment error, you can continue and make another payment',
}
# transaction paid
transaction = Transaction.objects.create(amount=Decimal('10.0'), regie=regie, status=eopayment.PAID)
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps(transaction.pk)})
resp = app.get(url)
assert resp.json == {'wait': False, 'error': False, 'error_msg': ''}
# transaction cancelled
transaction = Transaction.objects.create(amount=Decimal('10.0'), regie=regie, status=eopayment.CANCELLED)
url = reverse('api-transaction-status', kwargs={'transaction_signature': signing_dumps(transaction.pk)})
resp = app.get(url)
assert resp.json == {
'wait': True,
'error': False,
'error_msg': 'Payment cancelled, you can continue and make another payment',
}
def test_request_payment_exception(app, basket_page, regie, user):
BasketItem.objects.create(
user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/test/7/'
)
with mock.patch('eopayment.dummy.Payment.request', autospec=True) as mock_request:
mock_request.side_effect = eopayment.PaymentException
resp = login(app).get(basket_page.get_online_url())
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = resp.form.submit().follow()
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.org/test/7/jump/trigger/paid')
assert 'Failed to initiate payment request' in resp.text
@pytest.mark.parametrize('transaction_date', [None, datetime(2020, 1, 1, 12, 00, 00, tzinfo=utc)])
def test_bank_transaction_date(app, key, regie, user, john_doe, caplog, transaction_date):
amount = 12
data = {'amount': amount, 'display_name': 'test amount', 'url': 'http://example.com/form/id/'}
url = reverse('api-add-basket-item') + '?orig=wcs'
url = sign_url(url, key)
resp = app.post_json(url, params=data)
# check that an unpaid item exists in basket
assert BasketItem.objects.filter(regie=regie, amount=amount, payment_date__isnull=True).exists()
payment_url = resp.json['payment_url']
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as trigger_request:
resp = app.get(payment_url, params={'next_url': 'http://example.net/form/id/'})
assert trigger_request.call_count == 1
assert trigger_request.call_args[0][0] == 'GET'
assert trigger_request.call_args[0][1].startswith('http://example.com/form/id/jump/trigger/paid')
assert Transaction.objects.count() == 1
transaction = Transaction.objects.get()
order_id = transaction.order_id
# make sure the redirection is done to the payment backend
assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/')
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['amount'] == ['12.00']
# simulate successful payment response from dummy backend
transaction_id = (qs['transaction_id'][0],)
data = {
'transaction_id': transaction_id,
'ok': True,
'amount': qs['amount'][0],
'signed': True,
}
mock_response = mock.Mock()
mock_response.status_code = 200
# simulate payment service redirecting the user to /lingo/return/... (eopayment
# dummy module put that URL in return_url query string parameter).
with mock.patch('eopayment.dummy.Payment.response') as eopayment_response:
return_value = eopayment_response.return_value
return_value.result = 1 # eopayment.RECEIVED
return_value.order_id = order_id
return_value.bank_data = {'a': 'b'}
return_value.transaction_id = '1234'
return_value.transaction_date = transaction_date
with mock.patch(
'combo.utils.requests_wrapper.RequestsSession.request', return_value=mock_response
) as request:
resp = app.get(qs['return_url'][0], params=data)
assert request.call_count == 0
# check transaction_date was recorded
transaction.refresh_from_db()
assert transaction.bank_transaction_date == transaction_date
if transaction_date is None:
assert 'no transaction date' in caplog.text
# send another transaction_date
with mock.patch('eopayment.dummy.Payment.response') as eopayment_response:
return_value = eopayment_response.return_value
return_value.result = 3 # eopayment.PAID
return_value.order_id = order_id
return_value.bank_data = {'a': 'b'}
return_value.transaction_id = '1234'
return_value.transaction_date = datetime(2020, 1, 2, 12, 0, tzinfo=utc)
with mock.patch(
'combo.utils.requests_wrapper.RequestsSession.post', return_value=mock_response
) as post:
resp = app.get(qs['return_url'][0], params=data)
if transaction_date is None:
assert json.loads(post.call_args[1]['data'])['bank_transaction_date'] == '2020-01-02T12:00:00'
else:
assert json.loads(post.call_args[1]['data'])['bank_transaction_date'] == '2020-01-01T12:00:00'
assert post.call_args[0][0] == 'http://example.com/form/id/jump/trigger/paid'
transaction.refresh_from_db()
if transaction_date is None:
transaction.refresh_from_db()
assert transaction.bank_transaction_date == datetime(2020, 1, 2, 12, 0, tzinfo=utc)
else:
assert transaction.bank_transaction_date == transaction_date
assert 'new transaction_date for transaction' in caplog.text
@pytest.fixture
def mono_regie(regie):
regie.can_pay_only_one_basket_item = True
regie.save()
return regie
def test_successfull_items_can_pay_only_one_basket_item(app, basket_page, mono_regie, user):
item = BasketItem.objects.create(
user=user, regie=mono_regie, amount=42, subject='foo item', reference_id='form-3-23'
)
BasketItem.objects.create(user=user, regie=mono_regie, amount=84, subject='bar item')
resp = login(app).get('/test_basket_cell/')
assert 'foo item' in resp.text
assert 'bar item' in resp.text
resp = resp.click('Pay', href=item.payment_url)
# successful payment
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['subject'][0] == 'foo item', 'item.subject was not provided to eopayment'
assert qs['orderid'][0] == 'form-3-23'
args = {'transaction_id': qs['transaction_id'][0], 'signed': True, 'ok': True, 'reason': 'Paid'}
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request'):
resp = app.get(get_url(True, 'lingo-callback', mono_regie), params=args)
resp = app.get('/test_basket_cell/')
assert 'foo item' not in resp.text
assert 'bar item' in resp.text
def test_extra_kwargs_can_pay_only_one_basket_item(app, basket_page, mono_regie, user):
item = BasketItem.objects.create(
user=user,
regie=mono_regie,
amount=42,
subject='foo item',
request_data={'eopayment_request_kwargs_info3': 'très fragile'},
)
resp = login(app).get(item.payment_url)
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['subject'][0] == 'foo item', 'item.subject was not provided to eopayment'
assert qs['info3'][0] == 'très fragile'
@mock.patch('combo.apps.lingo.models.BasketItem.get_items_to_be_paid')
def test_failure_can_only_pay_one_basket_item(
get_items_to_be_paid, app, mono_regie, user, user_name_id, caplog
):
item = mock.Mock(spec=['capture_date'], capture_date=None)
get_items_to_be_paid.return_value.filter.return_value = [item, item]
app.set_user(user)
response = app.get('/')
token = response.context['csrf_token']
assert '_messages' not in app.session
assert len(caplog.records) == 0
response = app.post(
'/lingo/pay',
params={
'regie': mono_regie.id,
'csrfmiddlewaretoken': token,
},
)
assert 'Grouping basket items is not allowed.' in app.session['_messages']
assert len(caplog.records) == 1
assert 'regie can only pay one basket item' in caplog.records[0].message
def test_tipi_kwargs_can_pay_only_one_basket_item(app, basket_page, mono_regie, user):
item = BasketItem.objects.create(
user=user,
regie=mono_regie,
amount=42,
subject='foo item',
request_data={'refdet': 'F20201030', 'exer': '2020'},
)
resp = login(app).get(item.payment_url)
# with dummy refdet/exer are ignored
assert '2020' not in resp.location
mono_regie.payment_backend.service = 'tipi'
mono_regie.payment_backend.save()
resp = login(app).get(item.payment_url)
qs = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)
assert qs['refdet'][0] == 'F20201030'
assert qs['exer'][0] == '2020'
@mock.patch('eopayment.payfip_ws.Payment.request', return_value=(1, eopayment.URL, 'https://payfip/'))
def test_payfip_ws_kwargs_can_pay_only_one_basket_item(payment_request, app, basket_page, mono_regie, user):
item = BasketItem.objects.create(
user=user,
regie=mono_regie,
amount=42,
subject='foo item',
request_data={'refdet': 'F20201030', 'exer': '2020'},
)
mono_regie.payment_backend.service = 'payfip_ws'
mono_regie.payment_backend.save()
resp = login(app).get(item.payment_url)
assert resp.location == 'https://payfip/'
assert payment_request.call_args[1]['refdet'] == 'F20201030'
assert payment_request.call_args[1]['exer'] == '2020'
@pytest.fixture
def remote_invoices_httmock():
invoices = []
invoice = {}
netloc = 'remote.regie.example.com'
@httmock.urlmatch(netloc=netloc, path='^/invoice/')
def invoice_mock(url, request):
return json.dumps({'err': 0, 'data': invoice})
@httmock.urlmatch(netloc=netloc, path='^/invoices/')
def invoices_mock(url, request):
return json.dumps({'err': 0, 'data': invoices})
context_manager = httmock.HTTMock(invoices_mock, invoice_mock)
context_manager.url = 'https://%s/' % netloc
context_manager.invoices = invoices
context_manager.invoice = invoice
return context_manager
def test_email_from_basket(app, regie, remote_invoices_httmock):
def parse_qs(url):
return QueryDict(urllib.parse.urlparse(url).query)
user1 = User.objects.create(username='user1', email='user1@example.com')
user2 = User.objects.create(username='user2', email='user2@example.com')
item1 = BasketItem.objects.create(user=user1, regie=regie, amount=42, subject='foo item')
item12 = BasketItem.objects.create(user=user1, regie=regie, amount=42, subject='bar item')
item2 = BasketItem.objects.create(user=user2, regie=regie, amount=42, subject='foo item')
item3 = BasketItem.objects.create(regie=regie, amount=42, subject='foo item', email='no-user@example.com')
item4 = BasketItem.objects.create(regie=regie, amount=42, subject='foo item')
# **Behaviours of BasketItemPayView**
# not authenticated, email is from the basket item
response = app.get(item1.payment_url)
qs = parse_qs(response.location)
assert qs['email'] == user1.email
response = app.get(item12.payment_url)
qs = parse_qs(response.location)
assert qs['email'] == user1.email
response = app.get(item2.payment_url)
qs = parse_qs(response.location)
assert qs['email'] == user2.email
response = app.get(item3.payment_url)
qs = parse_qs(response.location)
assert qs['email'] == 'no-user@example.com'
# FIXME: if required, there should be an email requested (PayFiP, Paybox, etc..)
response = app.get(item4.payment_url)
qs = parse_qs(response.location)
assert 'email' not in qs
# not authenticated, email in the query-string is ALWAYS used
response = app.get(item1.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == 'user3@example.com'
response = app.get(item12.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == 'user3@example.com'
response = app.get(item2.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == 'user3@example.com'
response = app.get(item3.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == 'user3@example.com'
response = app.get(item4.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == 'user3@example.com'
# authenticated, the email from the authenticated user is ALWAYS used, event
# if another email is given in the URL.
app.set_user(user1)
response = app.get(item1.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == user1.email
response = app.get(item12.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == user1.email
response = app.get(item2.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == user1.email
response = app.get(item3.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == user1.email
response = app.get(item4.payment_url + '?email=user3@example.com')
qs = parse_qs(response.location)
assert qs['email'] == user1.email
# **Behaviours of PayView**
# Logout
app.set_user(None)
app.session.flush()
# email is mandatory for paying a remote invoice without being authenticated
response = app.get('/')
csrf_token = response.context['csrf_token']
response = app.post(
'/lingo/pay',
params={
'regie': regie.id,
'csrfmiddlewaretoken': csrf_token,
'item': ['1'],
'item_url': '/item-url/',
},
)
assert response.location == '/'
assert 'Payment requires to be logged in.' in app.session['_messages']
app.set_user(user1)
response = app.get('/')
csrf_token = response.context['csrf_token']
# Paying the basket always use the logged user email
response = app.post(
'/lingo/pay',
params={
'regie': regie.id,
'csrfmiddlewaretoken': csrf_token,
'item': ['1'],
'item_url': '/item-url/',
'email': 'no-user@example.com',
},
)
assert response.location.startswith('http://dummy-payment.demo.entrouvert.com/')
qs = parse_qs(response.location)
assert qs['email'] == 'user1@example.com'
# Now try with a remote regie
regie.webservice_url = remote_invoices_httmock.url
regie.save()
# Logout
app.set_user(None)
app.session.flush()
# Get a response to extract csrf_token
response = app.get('/')
csrf_token = response.context['csrf_token']
with remote_invoices_httmock:
remote_invoices_httmock.invoice.update(
{
'id': 'F201601',
'display_id': 'F-2016-One',
'label': 'invoice-one',
'regie': 'remote',
'created': '2016-02-02',
'pay_limit_date': '2999-12-31',
'total_amount': '123.45',
'amount': '123.45',
'has_pdf': True,
'online_payment': True,
'paid': False,
'payment_date': '1970-01-01',
'no_online_payment_reason': '',
}
)
# email is mandatory for paying a remote invoice without being authenticated
response = app.post(
'/lingo/pay',
params={
'regie': regie.id,
'csrfmiddlewaretoken': csrf_token,
'item': ['1'],
'item_url': '/item-url/',
},
)
assert response.location == '/item-url/'
assert 'You must give an email address.' in app.session['_messages']
response = app.post(
'/lingo/pay',
params={
'regie': regie.id,
'csrfmiddlewaretoken': csrf_token,
'item': ['1'],
'item_url': '/item-url/',
'email': 'no-user@example.com',
},
)
assert response.location.startswith('http://dummy-payment.demo.entrouvert.com/')
qs = parse_qs(response.location)
assert qs['email'] == 'no-user@example.com'
app.set_user(user1)
response = app.get('/')
csrf_token = response.context['csrf_token']
# authenticated, always use the authenticated user's email
response = app.post(
'/lingo/pay',
params={
'regie': regie.id,
'csrfmiddlewaretoken': csrf_token,
'item': ['1'],
'item_url': '/item-url/',
'email': 'no-user@example.com',
},
)
assert response.location.startswith('http://dummy-payment.demo.entrouvert.com/')
qs = parse_qs(response.location)
assert qs['email'] == 'user1@example.com'
class TestPolling:
@pytest.fixture
def payment_backend(self, payment_backend):
with mock.patch(
'eopayment.payfip_ws.Payment.request', return_value=(1, eopayment.URL, 'https://payfip/')
):
payment_backend.service = 'payfip_ws'
payment_backend.save()
yield payment_backend
class TestPollBackendCommand:
@pytest.fixture(autouse=True)
def setup(self, regie, payment_backend):
item = BasketItem.objects.create(amount=10, regie=regie)
transaction = Transaction.objects.create(order_id='1234', status=0, amount=10, regie=regie)
transaction.items.set([item])
@pytest.fixture
def payment_status(self):
with mock.patch('eopayment.Payment.payment_status') as payment_status:
payment_status.return_value = eopayment.common.PaymentResponse(
order_id='1234',
result=eopayment.PAID,
transaction_date=now(),
transaction_id='4567',
bank_data={'abcd': 'xyz'},
signed=True,
)
yield payment_status
@pytest.mark.parametrize('cmd_options', [{'all_backends': True}, {'backend': 'test1'}])
def test_ok(self, payment_status, freezer, cmd_options):
# transactions are polled after 5 minutes.
freezer.move_to(timedelta(minutes=4))
call_command('lingo-poll-backend', **cmd_options)
assert payment_status.call_count == 0
freezer.move_to(timedelta(minutes=1, seconds=1))
call_command('lingo-poll-backend', **cmd_options)
transaction = Transaction.objects.get()
payment_status.assert_called_once_with('1234', transaction_date=transaction.start_date)
transaction.refresh_from_db()
assert transaction.status == eopayment.PAID
assert transaction.bank_transaction_date is not None
assert transaction.bank_data == {'abcd': 'xyz'}
def test_max_age(self, payment_status, freezer):
# transaction older than 1 day are ignored
freezer.move_to(timedelta(days=1, minutes=1))
call_command('lingo-poll-backend', all_backends=True, max_age_in_days=1)
assert payment_status.call_count == 0
# default is 3 days
freezer.move_to(timedelta(days=2))
call_command('lingo-poll-backend', all_backends=True)
assert payment_status.call_count == 0
def test_payment_exception(self, payment_status, freezer):
payment_status.side_effect = eopayment.PaymentException('boom!!')
# transactions are polled after 5 minutes.
freezer.move_to(timedelta(minutes=5, seconds=1))
call_command('lingo-poll-backend', interactive=False, all_backends=True)
assert payment_status.call_count == 1
transaction = Transaction.objects.get()
assert transaction.status != eopayment.PAID
with pytest.raises(LingoException): # from combo.apps.lingo.models
call_command('lingo-poll-backend', all_backends=True)
def test_cli_ok(self):
with mock.patch('combo.apps.lingo.models.PaymentBackend.poll_backend') as mock_poll_backend:
call_command('lingo-poll-backend', backend='test1')
assert mock_poll_backend.call_count == 1
with mock.patch('combo.apps.lingo.models.PaymentBackend.poll_backend') as mock_poll_backend:
with mock.patch(
'combo.apps.lingo.management.commands.lingo-poll-backend.input'
) as mock_input:
mock_input.return_value = 'test1'
call_command('lingo-poll-backend')
assert mock_poll_backend.call_count == 1
def test_cli_errors(self):
call_command('lingo-poll-backend', backend='test1')
with mock.patch('combo.apps.lingo.management.commands.lingo-poll-backend.input') as mock_input:
mock_input.return_value = 'test1'
call_command('lingo-poll-backend')
with pytest.raises(CommandError):
call_command('lingo-poll-backend', all_backends=True, backend='test1')
with pytest.raises(CommandError):
call_command('lingo-poll-backend', backend='coin')
def test_no_backend(self, capsys):
PaymentBackend.objects.all().delete()
call_command('lingo-poll-backend', verbosity=0, interactive=False, all_backends=True)
captured = capsys.readouterr()
assert captured.out == ''
assert captured.err == ''
class TestRecentTransactionsCell:
@pytest.fixture(autouse=True)
def setup(self, app, user, basket_page, mono_regie):
BasketItem.objects.create(
user=user,
regie=mono_regie,
amount=42,
subject='foo item',
request_data={'refdet': 'F20201030', 'exer': '2020'},
)
cell = LingoRecentTransactionsCell(page=basket_page, placeholder='content', order=1)
cell.save()
@pytest.fixture
def app(self, app, user):
login(app)
return app
@mock.patch('eopayment.payfip_ws.Payment.payment_status')
def test_refresh_status_through_polling(
self,
payment_status,
app,
user,
synchronous_cells,
):
# Try to pay
pay_resp = app.get('/test_basket_cell/')
assert 'foo item' in pay_resp
assert 'Running' not in pay_resp
resp = pay_resp.click('Pay')
# we are redirect to payfip
assert resp.location == 'https://payfip/'
transaction = Transaction.objects.get()
# Simulate still running status on polling
payment_status.return_value = eopayment.common.PaymentResponse(
signed=True,
result=eopayment.WAITING,
order_id=transaction.order_id,
)
# check get_items_to_be_paid() does not poll anymore
BasketItem.get_items_to_be_paid(user)
assert payment_status.call_count == 0
# Try to pay again, only with current information
synchronous_cells.off()
resp = app.get('/test_basket_cell/')
assert 'Loading' in resp.pyquery('.lingo-basket-cell').text()
assert 'Loading' in resp.pyquery('.lingo-recent-transactions-cell').text()
resp = pay_resp.click('Pay').follow()
assert 'Some items are already paid or' in resp
assert len(resp.pyquery('.lingo-basket-cell')) == 0
assert 'Loading' in resp.pyquery('.lingo-recent-transactions-cell').text()
assert payment_status.call_count == 1 # pay made a call to payfip backend
payment_status.reset_mock()
# Make rendering synchronous and retry
synchronous_cells.on()
resp = app.get('/test_basket_cell/')
assert len(resp.pyquery('.lingo-basket-cell')) == 0
assert 'Running' in resp.pyquery('.lingo-recent-transactions-cell').text()
assert payment_status.call_count == 1 # transactions cell polled
payment_status.reset_mock()
resp = pay_resp.click('Pay')
assert payment_status.call_count == 1 # pay polled
payment_status.reset_mock()
resp = resp.follow()
assert 'Some items are already paid or' in resp
assert len(resp.pyquery('.lingo-basket-cell')) == 0
assert 'Running' in resp.pyquery('.lingo-recent-transactions-cell').text()
assert payment_status.call_count == 1 # transactions cell polled
payment_status.reset_mock()
# Simulate paid status on polling
payment_status.return_value = eopayment.common.PaymentResponse(
signed=True,
result=eopayment.PAID,
order_id=transaction.order_id,
)
# Try to pay again
resp = app.get('/test_basket_cell/')
assert 'foo item: 42.00' in resp
assert 'Pay' not in resp
assert 'Running' not in resp
assert len(resp.pyquery('.lingo-basket-cell')) == 0
assert '42.00' in resp.pyquery('.lingo-recent-transactions-cell').text()
assert payment_status.call_count == 1 # transactions cell polled
payment_status.reset_mock()
@mock.patch('eopayment.payfip_ws.Payment.payment_status')
def test_exception_during_polling(
self,
payment_status,
app,
synchronous_cells,
caplog,
):
# Try to pay
pay_resp = app.get('/test_basket_cell/')
assert 'foo item' in pay_resp
assert 'Running' not in pay_resp
resp = pay_resp.click('Pay')
# we are redirect to payfip
assert resp.location == 'https://payfip/'
# Simulate polling failure
payment_status.side_effect = eopayment.PaymentException('boom!')
# Try to pay again
resp = app.get('/test_basket_cell/')
assert 'foo item' in pay_resp
assert 'Running' not in pay_resp
last_record = caplog.records[-1]
assert last_record.levelname == 'WARNING'
assert 'polling backend for transaction' in last_record.message