940 lines
41 KiB
Python
940 lines
41 KiB
Python
from contextlib import contextmanager
|
|
import eopayment
|
|
import pytest
|
|
from datetime import datetime, timedelta
|
|
import urllib
|
|
from decimal import Decimal
|
|
import json
|
|
import mock
|
|
|
|
from django.apps import apps
|
|
from django.contrib.auth.models import User
|
|
from django.core.urlresolvers import reverse
|
|
from django.core.wsgi import get_wsgi_application
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
from django.contrib.messages.storage.session import SessionStorage
|
|
from webtest import TestApp
|
|
|
|
from combo.data.models import Page
|
|
from combo.apps.lingo.models import (
|
|
Regie, BasketItem, Transaction, TransactionOperation, RemoteItem, EXPIRED, LingoBasketCell,
|
|
PaymentBackend)
|
|
from combo.utils import sign_url
|
|
|
|
from .test_manager import login
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
def get_url(with_payment_backend, view_name, regie):
|
|
if with_payment_backend:
|
|
return reverse(
|
|
view_name + '-payment-backend',
|
|
kwargs={'payment_backend_pk': regie.payment_backend.id})
|
|
return reverse(view_name, kwargs={'regie_pk': regie.id})
|
|
|
|
|
|
@contextmanager
|
|
def check_log(caplog, message):
|
|
idx = len(caplog.records)
|
|
yield
|
|
assert any(message in record.message for record in caplog.records[idx:]), \
|
|
'%r not found in log records' % message
|
|
|
|
|
|
@pytest.fixture
|
|
def regie():
|
|
try:
|
|
payment_backend = PaymentBackend.objects.get(slug='test1')
|
|
except PaymentBackend.DoesNotExist:
|
|
payment_backend = PaymentBackend.objects.create(
|
|
label='test1', slug='test1', service='dummy', service_options={'siret': '1234'})
|
|
try:
|
|
regie = Regie.objects.get(slug='test')
|
|
except Regie.DoesNotExist:
|
|
regie = Regie()
|
|
regie.label = 'Test'
|
|
regie.slug = 'test'
|
|
regie.description = 'test'
|
|
regie.payment_min_amount = Decimal(4.5)
|
|
regie.payment_backend = payment_backend
|
|
regie.save()
|
|
return regie
|
|
|
|
@pytest.fixture
|
|
def basket_page():
|
|
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
|
|
page.save()
|
|
cell = LingoBasketCell(page=page, placeholder='content', order=0)
|
|
cell.save()
|
|
return page
|
|
|
|
@pytest.fixture
|
|
def user():
|
|
try:
|
|
user = User.objects.get(username='admin')
|
|
except User.DoesNotExist:
|
|
user = User.objects.create_user('admin', password='admin',
|
|
email='foo@example.com')
|
|
return user
|
|
|
|
@pytest.fixture(params=['orig', 'sign_key'])
|
|
def key(request, settings):
|
|
if request.param == 'orig':
|
|
key = 'abcde'
|
|
settings.KNOWN_SERVICES = {
|
|
'wcs': {
|
|
'wcs1': {
|
|
'url': 'http://example.org/',
|
|
'verif_orig': 'wcs',
|
|
'secret': key,
|
|
},
|
|
}
|
|
}
|
|
return key
|
|
else:
|
|
return settings.LINGO_API_SIGN_KEY
|
|
|
|
def test_default_regie():
|
|
payment_backend = PaymentBackend.objects.create(label='foo', slug='foo')
|
|
Regie.objects.all().delete()
|
|
regie1 = Regie(label='foo', slug='foo', payment_backend=payment_backend)
|
|
regie1.save()
|
|
assert bool(regie1.is_default) is True
|
|
regie2 = Regie(label='bar', slug='bar', payment_backend=payment_backend)
|
|
regie2.save()
|
|
assert bool(regie2.is_default) is False
|
|
|
|
regie2.is_default = True
|
|
regie2.save()
|
|
regie2 = Regie.objects.get(id=regie2.id)
|
|
assert bool(regie2.is_default) is True
|
|
regie1 = Regie.objects.get(id=regie1.id)
|
|
assert bool(regie1.is_default) is False
|
|
|
|
def test_regie_api(app):
|
|
resp = app.get(reverse('api-regies'))
|
|
assert len(json.loads(resp.content).get('data')) == 0
|
|
test_default_regie()
|
|
resp = app.get(reverse('api-regies'))
|
|
assert len(json.loads(resp.content).get('data')) == 2
|
|
assert json.loads(resp.content).get('data')[0]['id'] == Regie.objects.get(is_default=True).slug
|
|
|
|
def test_payment_min_amount(app, basket_page, regie, user):
|
|
items = {'item1': {'amount': '1.5', 'source_url': '/item/1'},
|
|
'item2': {'amount': '2.4', 'source_url': '/item/2'}
|
|
}
|
|
b_items = []
|
|
for subject, details in items.items():
|
|
b_item = BasketItem.objects.create(user=user, regie=regie,
|
|
subject=subject, **details)
|
|
b_items.append(b_item.pk)
|
|
resp = login(app).get('/test_basket_cell/')
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
|
|
|
|
def test_transaction_manual_validation(app, basket_page, user, monkeypatch):
|
|
pb = PaymentBackend.objects.create(
|
|
label='test1', slug='test1', service='payzen',
|
|
service_options={'vads_site_id': '12345678', 'secret_test': 'plkoBdfcx987dhft6'}
|
|
|
|
)
|
|
regie = Regie.objects.create(
|
|
label='Test', slug='test', description='test', payment_backend=pb,
|
|
transaction_options={'manual_validation': True})
|
|
BasketItem.objects.create(
|
|
user=user, regie=regie, subject='item1', amount='1.5', source_url='/item/1')
|
|
|
|
class MockPayment(object):
|
|
request = mock.Mock(return_value=(9876, 3, {}))
|
|
|
|
def get_eopayment_object(*args, **kwargs):
|
|
return MockPayment
|
|
|
|
import combo.apps.lingo.views
|
|
monkeypatch.setattr(combo.apps.lingo.views, 'get_eopayment_object', get_eopayment_object)
|
|
|
|
resp = login(app).get('/test_basket_cell/')
|
|
resp = resp.form.submit()
|
|
assert MockPayment.request.call_args.kwargs['manual_validation'] is True
|
|
|
|
|
|
@pytest.mark.parametrize('with_payment_backend', [False, True])
|
|
def test_successfull_items_payment(app, basket_page, regie, user, with_payment_backend):
|
|
items = {'item1': {'amount': '10.5', 'source_url': 'http://example.org/item/1'},
|
|
'item2': {'amount': '42', 'source_url': 'http://example.org/item/2'},
|
|
'item3': {'amount': '100', 'source_url': 'http://example.org/item/3'},
|
|
'item4': {'amount': '354', 'source_url': 'http://example.org/item/4'}
|
|
}
|
|
b_items = []
|
|
for subject, details in items.items():
|
|
b_item = BasketItem.objects.create(user=user, regie=regie,
|
|
subject=subject, **details)
|
|
b_items.append(b_item.pk)
|
|
|
|
resp = login(app).get('/test_basket_cell/')
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
assert 'dummy-payment' in location
|
|
parsed = urlparse.urlparse(location)
|
|
# get return_url and transaction id from location
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
args = {'transaction_id': qs['transaction_id'][0], 'signed': True,
|
|
'ok': True, 'reason': 'Paid'}
|
|
# make sure return url is the user return URL
|
|
assert urlparse.urlparse(qs['return_url'][0]).path.startswith(
|
|
reverse('lingo-return-payment-backend',
|
|
kwargs={'payment_backend_pk': regie.payment_backend.id}))
|
|
# simulate successful call to callback URL
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
resp = app.get(get_url(with_payment_backend, 'lingo-callback', regie), params=args)
|
|
assert resp.status_code == 200
|
|
# simulate successful return URL
|
|
resp = app.get(qs['return_url'][0], params=args)
|
|
assert resp.status_code == 302
|
|
assert urlparse.urlparse(resp.url).path == '/test_basket_cell/'
|
|
resp = resp.follow()
|
|
assert 'Your payment has been succesfully registered.' in resp.text
|
|
|
|
def test_add_amount_to_basket(app, key, regie, user):
|
|
payment_backend = PaymentBackend.objects.create(
|
|
label='test2', slug='test2', service='dummy', service_options={'siret': '1234'})
|
|
other_regie = Regie(label='test2', slug='test2', payment_backend=payment_backend)
|
|
other_regie.save()
|
|
|
|
user_email = 'foo@example.com'
|
|
User.objects.get_or_create(email=user_email)
|
|
amount = 42
|
|
data = {'amount': amount, 'display_name': 'test amount',
|
|
'url': 'http://example.com'}
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
|
|
url = sign_url(url, key)
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=amount).exists()
|
|
assert BasketItem.objects.filter(amount=amount)[0].regie_id == regie.id
|
|
|
|
data['extra'] = {'amount': '22.22'}
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
|
|
url = sign_url(url, key)
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=Decimal('64.22')).exists()
|
|
|
|
data['amount'] = [amount]
|
|
data['extra'] = {'amount': ['22.22', '12']}
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
|
|
url = sign_url(url, key)
|
|
resp = app.post_json('%s&amount=5' % url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=Decimal('81.22')).exists()
|
|
|
|
other_regie.is_default = True
|
|
other_regie.save()
|
|
data['amount'] = []
|
|
data['extra'] = {'amount': '22.23'}
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
|
|
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
|
resp = app.post_json(url, params=data)
|
|
item = BasketItem.objects.get(amount=Decimal('22.23'))
|
|
assert resp.status_code == 200
|
|
response = json.loads(resp.content)
|
|
assert response['result'] == 'success'
|
|
assert response['payment_url'].endswith('/lingo/item/%s/pay' % item.id)
|
|
assert BasketItem.objects.filter(amount=Decimal('22.23')).exists()
|
|
assert BasketItem.objects.filter(amount=Decimal('22.23'))[0].regie_id == other_regie.id
|
|
|
|
url = '%s?email=%s®ie_id=%s' % (
|
|
reverse('api-add-basket-item'), user_email, regie.id)
|
|
data['extra'] = {'amount': '22.24', 'foo': 'bar'}
|
|
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=Decimal('22.24')).exists()
|
|
assert BasketItem.objects.filter(amount=Decimal('22.24'))[0].regie_id == regie.id
|
|
assert BasketItem.objects.filter(amount=Decimal('22.24'))[0].request_data == data['extra']
|
|
|
|
url = '%s?email=%s®ie_id=%s' % (
|
|
reverse('api-add-basket-item'), user_email, regie.slug)
|
|
data['extra'] = {'amount': '13.67'}
|
|
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=Decimal('13.67')).exists()
|
|
assert BasketItem.objects.filter(amount=Decimal('13.67'))[0].regie_id == regie.id
|
|
|
|
url = '%s?email=%s&orig=wcs®ie_id=%s' % (reverse('api-add-basket-item'), user_email, 'scarecrow')
|
|
url = sign_url(url, key)
|
|
resp = app.post_json(url, params=data, status=400)
|
|
assert resp.text == 'Unknown regie'
|
|
|
|
|
|
def test_basket_item_with_capture_date(app, user, regie, basket_page, monkeypatch):
|
|
User.objects.get_or_create(email=user.email)
|
|
url = '%s?email=%s' % (reverse('api-add-basket-item'), user.email)
|
|
capture_date = timezone.now().date()
|
|
data = {
|
|
'amount': 10, 'capture_date': capture_date.isoformat(),
|
|
'display_name': 'test item'
|
|
}
|
|
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert BasketItem.objects.all()[0].capture_date == capture_date
|
|
|
|
resp = login(app).get('/test_basket_cell/')
|
|
import eopayment
|
|
eopayment_mock = mock.Mock(
|
|
return_value=('orderid', eopayment.URL, 'http://dummy-payment.demo.entrouvert.com/'))
|
|
monkeypatch.setattr(eopayment.Payment, 'request', eopayment_mock)
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = urlparse.urlparse(resp.location)
|
|
assert location.path == '/'
|
|
assert location.hostname == 'dummy-payment.demo.entrouvert.com'
|
|
eopayment_mock.assert_called_once_with(
|
|
Decimal(10), email=user.email, first_name=user.first_name, last_name=user.last_name,
|
|
capture_date=capture_date)
|
|
|
|
|
|
@pytest.mark.parametrize("invalid_capture_date", [8, '', 'not-a-date'])
|
|
def test_add_basket_capture_date_format(app, user, regie, invalid_capture_date):
|
|
url = '%s?email=%s' % (reverse('api-add-basket-item'), user.email)
|
|
data = {'amount': 10, 'display_name': 'test item'}
|
|
data['capture_date'] = invalid_capture_date
|
|
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
|
resp = app.post_json(url, params=data, status=400)
|
|
assert 'Bad format for capture date, it should be yyyy-mm-dd.' in resp.content
|
|
|
|
|
|
def test_cant_pay_if_different_capture_date(app, basket_page, regie, user):
|
|
capture1 = (timezone.now() + timedelta(days=1)).date()
|
|
capture2 = (timezone.now() + timedelta(days=2)).date()
|
|
items = {
|
|
'item1': {
|
|
'amount': '10.5', 'capture_date': capture1.isoformat(),
|
|
'source_url': 'http://example.org/item/1'
|
|
},
|
|
'item2': {
|
|
'amount': '42', 'capture_date': capture2.isoformat(),
|
|
'source_url': 'http://example.org/item/2'},
|
|
}
|
|
b_items = []
|
|
for subject, details in items.items():
|
|
b_item = BasketItem.objects.create(
|
|
user=user, regie=regie, subject=subject, **details)
|
|
b_items.append(b_item.pk)
|
|
|
|
resp = login(app).get('/test_basket_cell/')
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
assert urlparse.urlparse(resp.location).path == '/test_basket_cell/'
|
|
resp = resp.follow()
|
|
assert "Invalid grouping for basket items: different capture dates." in resp.content
|
|
|
|
|
|
def test_pay_single_basket_item(app, key, regie, user, john_doe):
|
|
page = Page(title='xxx', slug='index', template_name='standard')
|
|
page.save()
|
|
cell = LingoBasketCell(page=page, placeholder='content', order=0)
|
|
cell.save()
|
|
|
|
amount = 12
|
|
data = {'amount': amount,
|
|
'display_name': 'test amount',
|
|
'url': 'http://example.com'}
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user.email)
|
|
url = sign_url(url, key)
|
|
resp = app.post_json(url, params=data)
|
|
# check that an unpaid item exists in basket
|
|
assert BasketItem.objects.filter(regie=regie, amount=amount, payment_date__isnull=True).exists()
|
|
payment_url = resp.json['payment_url']
|
|
resp = app.get(payment_url, status=403)
|
|
assert 'No item payment allowed for anonymous users.' in resp.text
|
|
|
|
login(app, username='john.doe', password='john.doe')
|
|
resp = app.get(payment_url, status=403)
|
|
assert 'Wrong item: payment not allowed.' in resp.text
|
|
|
|
# forbid payment to regie with extra_fees_ws_url
|
|
regie.extra_fees_ws_url = 'http://example.com/extra-fees'
|
|
regie.save()
|
|
app.reset()
|
|
login(app)
|
|
resp = app.get(payment_url, status=403)
|
|
assert 'No item payment allowed as extra fees set.' in resp.text
|
|
|
|
regie.extra_fees_ws_url = ''
|
|
regie.save()
|
|
|
|
resp = app.get(payment_url, params={'next_url': 'http://example.net/form/id/'})
|
|
|
|
# make sure the redirection is done to the payment backend
|
|
assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/')
|
|
qs = urlparse.parse_qs(urlparse.urlparse(resp.location).query)
|
|
assert qs['amount'] == ['12.00']
|
|
|
|
# simulate successful payment response from dummy backend
|
|
data = {'transaction_id': qs['transaction_id'][0], 'ok': True,
|
|
'amount': qs['amount'][0], 'signed': True}
|
|
# simulate payment service redirecting the user to /lingo/return/... (eopayment
|
|
# dummy module put that URL in return_url query string parameter).
|
|
resp = app.get(qs['return_url'][0], params=data)
|
|
# check that item is paid
|
|
assert BasketItem.objects.filter(regie=regie, amount=amount, payment_date__isnull=False).exists()
|
|
# check that user is redirected to the next_url passed previously
|
|
assert resp.location == 'http://example.net/form/id/'
|
|
|
|
def test_pay_multiple_regies(app, key, regie, user):
|
|
test_add_amount_to_basket(app, key, regie, user)
|
|
|
|
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
|
|
page.save()
|
|
cell = LingoBasketCell(page=page, placeholder='content', order=0)
|
|
cell.save()
|
|
|
|
resp = login(app).get(page.get_online_url())
|
|
resp = resp.forms[0].submit()
|
|
assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/')
|
|
qs = urlparse.parse_qs(urlparse.urlparse(resp.location).query)
|
|
assert qs['amount'] == ['223.35']
|
|
|
|
resp = login(app).get(page.get_online_url())
|
|
resp = resp.forms[1].submit()
|
|
qs = urlparse.parse_qs(urlparse.urlparse(resp.location).query)
|
|
assert qs['amount'] == ['22.23']
|
|
|
|
def test_pay_as_anonymous_user(app, key, regie, user):
|
|
test_add_amount_to_basket(app, key, regie, user)
|
|
|
|
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
|
|
page.save()
|
|
cell = LingoBasketCell(page=page, placeholder='content', order=0)
|
|
cell.save()
|
|
|
|
resp = login(app).get(page.get_online_url())
|
|
app.cookiejar.clear(domain='testserver.local', path='/', name='sessionid')
|
|
resp = resp.forms[0].submit().follow()
|
|
assert 'Payment requires to be logged in.' in resp.text
|
|
|
|
def test_cancel_basket_item(app, key, regie, user):
|
|
user_email = 'foo@example.com'
|
|
User.objects.get_or_create(email=user_email)
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
|
|
url = sign_url(url, key)
|
|
data = {'amount': 42, 'display_name': 'test amount', 'url':
|
|
'http://example.com/', 'notify': 'true'}
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
|
|
basket_item_id = json.loads(resp.content)['id']
|
|
|
|
data = {'amount': 21, 'display_name': 'test amount', 'url': 'http://example.net/'}
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
|
|
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
|
|
basket_item_id_2 = json.loads(resp.content)['id']
|
|
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email)
|
|
url = sign_url(url, key)
|
|
data = {'basket_item_id': basket_item_id, 'notify': 'true'}
|
|
resp = app.post_json(url, params=data)
|
|
assert request.call_args[0] == ('POST', u'http://example.com/jump/trigger/cancelled')
|
|
assert not BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
|
|
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
|
|
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email)
|
|
url = sign_url(url, key)
|
|
data = {'basket_item_id': basket_item_id_2}
|
|
resp = app.post_json(url, params=data)
|
|
assert request.call_count == 0
|
|
assert not BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
|
|
assert not BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
|
|
|
|
|
|
def test_cancel_basket_item_from_cell(app, key, regie, user):
|
|
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
|
|
page.save()
|
|
cell = LingoBasketCell(page=page, placeholder='content', order=0)
|
|
cell.save()
|
|
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user.email)
|
|
url = sign_url(url, key)
|
|
data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'}
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
|
|
basket_item_id = json.loads(resp.content)['id']
|
|
|
|
# check while not logged in
|
|
resp = app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}), status=404)
|
|
assert BasketItem.objects.filter(id=basket_item_id).exists()
|
|
|
|
# check a successful case
|
|
app = login(app)
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
resp = app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}))
|
|
resp = resp.form.submit()
|
|
url = request.call_args[0][1]
|
|
assert url.startswith('http://example.org/testitem/jump/trigger/cancelled')
|
|
assert BasketItem.objects.filter(id=basket_item_id, cancellation_date__isnull=False).exists()
|
|
|
|
# check removal of an item that is not cancellable
|
|
url = '%s?email=%s&cancellable=no&orig=wcs' % (reverse('api-add-basket-item'), user.email)
|
|
url = sign_url(url, key)
|
|
data = {'amount': 21, 'display_name': 'test amount',
|
|
'url': 'http://example.org/testitem/'}
|
|
resp = app.post_json(url, params=data)
|
|
basket_item2_id = json.loads(resp.content)['id']
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
|
|
|
|
resp = app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item2_id}))
|
|
resp = resp.form.submit()
|
|
resp = resp.follow()
|
|
assert 'This item cannot be removed.' in resp.text
|
|
|
|
# check removal of the item of another user
|
|
user_email = 'bar@example.com'
|
|
User.objects.get_or_create(email=user_email)
|
|
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
|
|
url = sign_url(url, key)
|
|
data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'}
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
basket_item_id = json.loads(resp.content)['id']
|
|
|
|
app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}), status=404)
|
|
app.post(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}), status=403)
|
|
|
|
|
|
@pytest.mark.parametrize('with_payment_backend', [False, True])
|
|
def test_payment_callback(app, basket_page, regie, user, with_payment_backend):
|
|
page = Page(title='xxx', slug='index', template_name='standard')
|
|
page.save()
|
|
item = BasketItem.objects.create(user=user, regie=regie,
|
|
subject='test_item', amount='10.5',
|
|
source_url='http://example.org/testitem/')
|
|
resp = login(app).get(basket_page.get_online_url())
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
parsed = urlparse.urlparse(location)
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
transaction_id = qs['transaction_id'][0]
|
|
data = {'transaction_id': transaction_id, 'signed': True,
|
|
'amount': qs['amount'][0], 'ok': True}
|
|
assert data['amount'] == '10.50'
|
|
|
|
# call callback with GET
|
|
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
get_resp = app.get(callback_url, params=data)
|
|
url = request.call_args[0][1]
|
|
assert url.startswith('http://example.org/testitem/jump/trigger/paid')
|
|
assert get_resp.status_code == 200
|
|
assert Transaction.objects.get(order_id=transaction_id).status == 3
|
|
|
|
item = BasketItem.objects.create(user=user, regie=regie,
|
|
subject='test_item', amount='11.5',
|
|
source_url='http://example.org/testitem/')
|
|
resp = login(app).get(basket_page.get_online_url())
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
parsed = urlparse.urlparse(location)
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
transaction_id = qs['transaction_id'][0]
|
|
data = {'transaction_id': transaction_id, 'signed': True,
|
|
'amount': qs['amount'][0], 'ok': True}
|
|
assert data['amount'] == '11.50'
|
|
|
|
# call callback with POST
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
post_resp = app.post(callback_url, params=data)
|
|
assert post_resp.status_code == 200
|
|
assert Transaction.objects.get(order_id=transaction_id).status == 3
|
|
|
|
# call return view
|
|
return_url = get_url(with_payment_backend, 'lingo-return', regie)
|
|
get_resp = app.get(return_url, params=data)
|
|
assert get_resp.status_code == 302
|
|
resp = app.get(get_resp['Location'])
|
|
assert 'Your payment has been succesfully registered.' in resp.text
|
|
|
|
@pytest.mark.parametrize('with_payment_backend', [False, True])
|
|
def test_payment_callback_no_regie(app, basket_page, regie, user, with_payment_backend):
|
|
item = BasketItem.objects.create(user=user, regie=regie,
|
|
subject='test_item', amount='10.5',
|
|
source_url='http://example.org/testitem/')
|
|
resp = login(app).get(basket_page.get_online_url())
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
parsed = urlparse.urlparse(location)
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
transaction_id = qs['transaction_id'][0]
|
|
data = {'transaction_id': transaction_id, 'signed': True,
|
|
'amount': qs['amount'][0], 'ok': True}
|
|
assert data['amount'] == '10.50'
|
|
|
|
# call callback with GET
|
|
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
get_resp = app.get(callback_url, params=data)
|
|
url = request.call_args[0][1]
|
|
assert url.startswith('http://example.org/testitem/jump/trigger/paid')
|
|
assert get_resp.status_code == 200
|
|
assert Transaction.objects.get(order_id=transaction_id).status == 3
|
|
|
|
item = BasketItem.objects.create(user=user, regie=regie,
|
|
subject='test_item', amount='11.5',
|
|
source_url='http://example.org/testitem/')
|
|
resp = login(app).get(basket_page.get_online_url())
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
parsed = urlparse.urlparse(location)
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
transaction_id = qs['transaction_id'][0]
|
|
data = {'transaction_id': transaction_id, 'signed': True,
|
|
'amount': qs['amount'][0], 'ok': True}
|
|
assert data['amount'] == '11.50'
|
|
|
|
|
|
@pytest.mark.parametrize('with_payment_backend', [False, True])
|
|
def test_nonexisting_transaction(app, regie, user, with_payment_backend):
|
|
app = login(app)
|
|
data = {'transaction_id': 'unknown', 'signed': True,
|
|
'amount': '23', 'ok': True}
|
|
|
|
# call callback with GET
|
|
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
|
|
app.get(callback_url, params=data, status=404)
|
|
|
|
|
|
@pytest.mark.parametrize('with_payment_backend', [False, True])
|
|
def test_payment_callback_waiting(app, basket_page, regie, user, with_payment_backend):
|
|
item = BasketItem.objects.create(user=user, regie=regie,
|
|
subject='test_item', amount='10.5',
|
|
source_url='http://example.org/testitem/')
|
|
resp = login(app).get(basket_page.get_online_url())
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
parsed = urlparse.urlparse(location)
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
transaction_id = qs['transaction_id'][0]
|
|
data = {'transaction_id': transaction_id, 'signed': True,
|
|
'amount': qs['amount'][0], 'waiting': True}
|
|
assert data['amount'] == '10.50'
|
|
|
|
# callback with WAITING state
|
|
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
|
|
resp = app.get(callback_url, params=data)
|
|
assert resp.status_code == 200
|
|
assert Transaction.objects.get(order_id=transaction_id).status == eopayment.WAITING
|
|
assert BasketItem.objects.get(id=item.id).waiting_date
|
|
assert not BasketItem.objects.get(id=item.id).payment_date
|
|
assert BasketItem.get_items_to_be_paid(user).count() == 0
|
|
|
|
# callback with PAID state
|
|
data = {'transaction_id': transaction_id, 'signed': True,
|
|
'amount': qs['amount'][0], 'ok': True}
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
resp = app.get(callback_url, params=data)
|
|
assert resp.status_code == 200
|
|
url = request.call_args[0][1]
|
|
assert url.startswith('http://example.org/testitem/jump/trigger/paid')
|
|
|
|
assert Transaction.objects.get(order_id=transaction_id).status == eopayment.PAID
|
|
assert BasketItem.objects.get(id=item.id).payment_date
|
|
assert BasketItem.get_items_to_be_paid(user).count() == 0
|
|
|
|
|
|
@pytest.mark.parametrize('with_payment_backend', [False, True])
|
|
def test_payment_no_callback_just_return(
|
|
caplog, app, basket_page, regie, user, with_payment_backend):
|
|
item = BasketItem.objects.create(user=user, regie=regie,
|
|
subject='test_item', amount='10.5',
|
|
source_url='http://example.org/testitem/')
|
|
resp = login(app).get(basket_page.get_online_url())
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
parsed = urlparse.urlparse(location)
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
transaction_id = qs['transaction_id'][0]
|
|
data = {'transaction_id': transaction_id,
|
|
'amount': qs['amount'][0], 'ok': True}
|
|
assert data['amount'] == '10.50'
|
|
|
|
# call return with unsigned POST
|
|
with check_log(caplog, 'received unsigned payment'):
|
|
return_url = get_url(with_payment_backend, 'lingo-return', regie)
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
get_resp = app.post(return_url, params=data)
|
|
assert request.call_count == 0
|
|
assert get_resp.status_code == 302
|
|
assert urlparse.urlparse(get_resp['location']).path == '/test_basket_cell/'
|
|
assert Transaction.objects.get(order_id=transaction_id).status == 0 # not paid
|
|
|
|
# call return with missing data
|
|
with check_log(caplog, 'failed to process payment response: missing transaction_id'):
|
|
baddata = data.copy()
|
|
del baddata['transaction_id']
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
get_resp = app.post(return_url, params=baddata)
|
|
assert get_resp.status_code == 302
|
|
resp = app.get(get_resp['Location'])
|
|
assert 'Your payment has been succesfully registered.' not in resp.text
|
|
assert 'the payment service failed to provide a correct answer.' in resp.text
|
|
assert Transaction.objects.get(order_id=transaction_id).status == 0 # not paid
|
|
|
|
# call return with signed POST
|
|
data['signed'] = True
|
|
return_url = get_url(with_payment_backend, 'lingo-return', regie)
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
get_resp = app.post(return_url, params=data)
|
|
url = request.call_args[0][1]
|
|
assert url.startswith('http://example.org/testitem/jump/trigger/paid')
|
|
assert get_resp.status_code == 302
|
|
assert urlparse.urlparse(get_resp['location']).path == '/test_basket_cell/'
|
|
resp = app.get(get_resp['Location'])
|
|
assert 'Your payment has been succesfully registered.' in resp.text
|
|
assert Transaction.objects.get(order_id=transaction_id).status == eopayment.PAID
|
|
|
|
def test_transaction_expiration():
|
|
t1 = Transaction(status=0)
|
|
t1.save()
|
|
t1.start_date = timezone.now() - timedelta(hours=2)
|
|
t1.save()
|
|
t2 = Transaction(status=0)
|
|
t2.save()
|
|
|
|
appconfig = apps.get_app_config('lingo')
|
|
appconfig.update_transactions()
|
|
|
|
assert Transaction.objects.get(id=t1.id).status == EXPIRED
|
|
assert Transaction.objects.get(id=t2.id).status == 0
|
|
|
|
def test_transaction_validate(app, key, regie, user):
|
|
t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12,
|
|
status=eopayment.PAID)
|
|
t1.save()
|
|
|
|
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0'
|
|
resp = app.post_json(url, params={}, status=403)
|
|
|
|
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0&orig=wcs'
|
|
url = sign_url(url, key)
|
|
resp = app.post_json(url, params={}, status=404)
|
|
|
|
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
|
|
url = sign_url(url, key)
|
|
resp = app.post_json(url, params={})
|
|
assert json.loads(resp.content)['err'] == 0
|
|
operations = TransactionOperation.objects.filter(transaction=t1)
|
|
assert len(operations) == 1
|
|
assert operations[0].amount == 10
|
|
|
|
with mock.patch.object(eopayment.dummy.Payment, 'validate', autospec=True) as mock_validate:
|
|
mock_validate.side_effect = eopayment.ResponseError
|
|
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
|
|
url = sign_url(url, key)
|
|
resp = app.post_json(url, params={})
|
|
assert json.loads(resp.content)['err'] == 1
|
|
assert TransactionOperation.objects.filter(transaction=t1).count() == 1
|
|
|
|
def test_transaction_cancel(app, key, regie, user):
|
|
t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12,
|
|
status=eopayment.PAID)
|
|
t1.save()
|
|
|
|
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs'
|
|
resp = app.post_json(url, params={}, status=403)
|
|
|
|
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs'
|
|
url = sign_url(url, key)
|
|
resp = app.post(url, params={}, status=404)
|
|
|
|
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
|
|
url = sign_url(url, key)
|
|
resp = app.post_json(url, params={})
|
|
assert json.loads(resp.content)['err'] == 0
|
|
operations = TransactionOperation.objects.filter(transaction=t1)
|
|
assert len(operations) == 1
|
|
assert operations[0].amount == 10
|
|
|
|
with mock.patch.object(eopayment.dummy.Payment, 'cancel', autospec=True) as mock_cancel:
|
|
mock_cancel.side_effect = eopayment.ResponseError
|
|
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
|
|
url = sign_url(url, key)
|
|
resp = app.post_json(url, params={})
|
|
assert json.loads(resp.content)['err'] == 1
|
|
assert TransactionOperation.objects.filter(transaction=t1).count() == 1
|
|
|
|
def test_extra_fees(app, basket_page, key, regie, user):
|
|
regie.extra_fees_ws_url = 'http://www.example.net/extra-fees'
|
|
regie.save()
|
|
|
|
user_email = 'foo@example.com'
|
|
User.objects.get_or_create(email=user_email)
|
|
amount = 42
|
|
data = {'amount': amount, 'display_name': 'test amount'}
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
mock_json = mock.Mock()
|
|
mock_json.status_code = 200
|
|
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '5'}]}
|
|
request.return_value = mock_json
|
|
url = sign_url('%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email), key)
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert BasketItem.objects.filter(amount=amount).exists()
|
|
assert BasketItem.objects.filter(amount=amount)[0].regie_id == regie.id
|
|
assert BasketItem.objects.filter(amount=5, extra_fee=True).exists()
|
|
assert BasketItem.objects.filter(amount=5, extra_fee=True)[0].regie_id == regie.id
|
|
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
mock_json = mock.Mock()
|
|
mock_json.status_code = 200
|
|
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '7'}]}
|
|
request.return_value = mock_json
|
|
data['amount'] = 43
|
|
url = sign_url('%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email), key)
|
|
resp = app.post_json(url, params=data)
|
|
assert request.call_args[0] == ('POST', 'http://www.example.net/extra-fees')
|
|
assert len(json.loads(request.call_args[1]['data'])['data']) == 2
|
|
|
|
assert resp.status_code == 200
|
|
assert json.loads(resp.content)['result'] == 'success'
|
|
assert not BasketItem.objects.filter(amount=5, extra_fee=True).exists()
|
|
assert BasketItem.objects.filter(amount=7, extra_fee=True).exists()
|
|
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
mock_json = mock.Mock()
|
|
mock_json.status_code = 200
|
|
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '4'}]}
|
|
request.return_value = mock_json
|
|
url = sign_url('%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email), key)
|
|
data = {'basket_item_id': BasketItem.objects.get(amount=43).id}
|
|
resp = app.post_json(url, params=data)
|
|
assert resp.status_code == 200
|
|
assert not BasketItem.objects.filter(amount=7, extra_fee=True).exists()
|
|
assert BasketItem.objects.filter(amount=4, extra_fee=True).exists()
|
|
|
|
# test payment
|
|
app = login(app)
|
|
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
mock_json = mock.Mock()
|
|
mock_json.status_code = 200
|
|
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '2'}]}
|
|
request.return_value = mock_json
|
|
resp = login(app).get(basket_page.get_online_url())
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
parsed = urlparse.urlparse(location)
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
transaction_id = qs['transaction_id'][0]
|
|
data = {'transaction_id': transaction_id, 'signed': True,
|
|
'amount': qs['amount'][0], 'ok': True}
|
|
assert data['amount'] == '44.00'
|
|
|
|
# test again, without specifying a regie
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
mock_json = mock.Mock()
|
|
mock_json.status_code = 200
|
|
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '3'}]}
|
|
request.return_value = mock_json
|
|
resp = login(app).get(basket_page.get_online_url())
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
parsed = urlparse.urlparse(location)
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
transaction_id = qs['transaction_id'][0]
|
|
data = {'transaction_id': transaction_id, 'signed': True,
|
|
'amount': qs['amount'][0], 'ok': True}
|
|
assert data['amount'] == '45.00'
|
|
|
|
# call callback with GET
|
|
callback_url = reverse('lingo-callback', kwargs={'regie_pk': regie.id})
|
|
resp = app.get(callback_url, params=data)
|
|
assert resp.status_code == 200
|
|
assert Transaction.objects.get(order_id=transaction_id).status == 3
|
|
|
|
|
|
@pytest.mark.parametrize('with_payment_backend', [False, True])
|
|
def test_payment_callback_error(app, basket_page, regie, user, with_payment_backend):
|
|
item = BasketItem.objects.create(user=user, regie=regie,
|
|
subject='test_item', amount='10.5',
|
|
source_url='http://example.org/testitem/')
|
|
resp = login(app).get(basket_page.get_online_url())
|
|
resp = resp.form.submit()
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
parsed = urlparse.urlparse(location)
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
transaction_id = qs['transaction_id'][0]
|
|
data = {'transaction_id': transaction_id, 'signed': True,
|
|
'amount': qs['amount'][0], 'ok': True}
|
|
assert data['amount'] == '10.50'
|
|
|
|
# call callback with GET
|
|
callback_url = get_url(with_payment_backend, 'lingo-callback', regie)
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
mock_response = mock.Mock()
|
|
def kaboom():
|
|
raise Exception('kaboom')
|
|
mock_response.status_code = 500
|
|
mock_response.raise_for_status = kaboom
|
|
request.return_value = mock_response
|
|
get_resp = app.get(callback_url, params=data)
|
|
url = request.call_args[0][1]
|
|
assert url.startswith('http://example.org/testitem/jump/trigger/paid')
|
|
assert get_resp.status_code == 200
|
|
assert Transaction.objects.get(order_id=transaction_id).status == 3
|
|
assert BasketItem.objects.get(id=item.id).payment_date
|
|
assert not BasketItem.objects.get(id=item.id).notification_date
|
|
|
|
# too soon
|
|
appconfig = apps.get_app_config('lingo')
|
|
appconfig.notify_payments()
|
|
assert BasketItem.objects.get(id=item.id).payment_date
|
|
assert not BasketItem.objects.get(id=item.id).notification_date
|
|
|
|
# fake delay
|
|
basket_item = BasketItem.objects.get(id=item.id)
|
|
basket_item.payment_date = timezone.now() - timedelta(hours=1)
|
|
basket_item.save()
|
|
|
|
with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
|
|
mock_response = mock.Mock()
|
|
mock_response.status_code = 200
|
|
request.return_value = mock_response
|
|
appconfig.notify_payments()
|
|
url = request.call_args[0][1]
|
|
assert url.startswith('http://example.org/testitem/jump/trigger/paid')
|
|
assert BasketItem.objects.get(id=item.id).payment_date
|
|
assert BasketItem.objects.get(id=item.id).notification_date
|