from contextlib import contextmanager import eopayment import pytest from datetime import datetime, timedelta import urllib from decimal import Decimal import json import mock from django.apps import apps from django.contrib.auth.models import User from django.core.urlresolvers import reverse from django.core.wsgi import get_wsgi_application from django.conf import settings from django.utils import timezone from django.utils.six.moves.urllib import parse as urlparse from django.contrib.messages.storage.session import SessionStorage from webtest import TestApp from combo.data.models import Page from combo.apps.lingo.models import ( Regie, BasketItem, Transaction, TransactionOperation, RemoteItem, EXPIRED, LingoBasketCell, PaymentBackend) from combo.utils import sign_url from .test_manager import login pytestmark = pytest.mark.django_db def get_url(with_payment_backend, view_name, regie): if with_payment_backend: return reverse( view_name + '-payment-backend', kwargs={'payment_backend_pk': regie.payment_backend.id}) return reverse(view_name, kwargs={'regie_pk': regie.id}) @contextmanager def check_log(caplog, message): idx = len(caplog.records) yield assert any(message in record.message for record in caplog.records[idx:]), \ '%r not found in log records' % message @pytest.fixture def regie(): try: payment_backend = PaymentBackend.objects.get(slug='test1') except PaymentBackend.DoesNotExist: payment_backend = PaymentBackend.objects.create( label='test1', slug='test1', service='dummy', service_options={'siret': '1234'}) try: regie = Regie.objects.get(slug='test') except Regie.DoesNotExist: regie = Regie() regie.label = 'Test' regie.slug = 'test' regie.description = 'test' regie.payment_min_amount = Decimal(4.5) regie.payment_backend = payment_backend regie.save() return regie @pytest.fixture def basket_page(): page = Page(title='xxx', slug='test_basket_cell', template_name='standard') page.save() cell = LingoBasketCell(page=page, placeholder='content', order=0) cell.save() return page @pytest.fixture def user(): try: user = User.objects.get(username='admin') except User.DoesNotExist: user = User.objects.create_user('admin', password='admin', email='foo@example.com') return user @pytest.fixture(params=['orig', 'sign_key']) def key(request, settings): if request.param == 'orig': key = 'abcde' settings.KNOWN_SERVICES = { 'wcs': { 'wcs1': { 'url': 'http://example.org/', 'verif_orig': 'wcs', 'secret': key, }, } } return key else: return settings.LINGO_API_SIGN_KEY def test_default_regie(): payment_backend = PaymentBackend.objects.create(label='foo', slug='foo') Regie.objects.all().delete() regie1 = Regie(label='foo', slug='foo', payment_backend=payment_backend) regie1.save() assert bool(regie1.is_default) is True regie2 = Regie(label='bar', slug='bar', payment_backend=payment_backend) regie2.save() assert bool(regie2.is_default) is False regie2.is_default = True regie2.save() regie2 = Regie.objects.get(id=regie2.id) assert bool(regie2.is_default) is True regie1 = Regie.objects.get(id=regie1.id) assert bool(regie1.is_default) is False def test_regie_api(app): resp = app.get(reverse('api-regies')) assert len(json.loads(resp.content).get('data')) == 0 test_default_regie() resp = app.get(reverse('api-regies')) assert len(json.loads(resp.content).get('data')) == 2 assert json.loads(resp.content).get('data')[0]['id'] == Regie.objects.get(is_default=True).slug def test_payment_min_amount(app, basket_page, regie, user): items = {'item1': {'amount': '1.5', 'source_url': '/item/1'}, 'item2': {'amount': '2.4', 'source_url': '/item/2'} } b_items = [] for subject, details in items.items(): b_item = BasketItem.objects.create(user=user, regie=regie, subject=subject, **details) b_items.append(b_item.pk) resp = login(app).get('/test_basket_cell/') resp = resp.form.submit() assert resp.status_code == 302 def test_transaction_manual_validation(app, basket_page, user, monkeypatch): pb = PaymentBackend.objects.create( label='test1', slug='test1', service='payzen', service_options={'vads_site_id': '12345678', 'secret_test': 'plkoBdfcx987dhft6'} ) regie = Regie.objects.create( label='Test', slug='test', description='test', payment_backend=pb, transaction_options={'manual_validation': True}) BasketItem.objects.create( user=user, regie=regie, subject='item1', amount='1.5', source_url='/item/1') class MockPayment(object): request = mock.Mock(return_value=(9876, 3, {})) def get_eopayment_object(*args, **kwargs): return MockPayment import combo.apps.lingo.views monkeypatch.setattr(combo.apps.lingo.views, 'get_eopayment_object', get_eopayment_object) resp = login(app).get('/test_basket_cell/') resp = resp.form.submit() assert MockPayment.request.call_args.kwargs['manual_validation'] is True @pytest.mark.parametrize('with_payment_backend', [False, True]) def test_successfull_items_payment(app, basket_page, regie, user, with_payment_backend): items = {'item1': {'amount': '10.5', 'source_url': 'http://example.org/item/1'}, 'item2': {'amount': '42', 'source_url': 'http://example.org/item/2'}, 'item3': {'amount': '100', 'source_url': 'http://example.org/item/3'}, 'item4': {'amount': '354', 'source_url': 'http://example.org/item/4'} } b_items = [] for subject, details in items.items(): b_item = BasketItem.objects.create(user=user, regie=regie, subject=subject, **details) b_items.append(b_item.pk) resp = login(app).get('/test_basket_cell/') resp = resp.form.submit() assert resp.status_code == 302 location = resp.location assert 'dummy-payment' in location parsed = urlparse.urlparse(location) # get return_url and transaction id from location qs = urlparse.parse_qs(parsed.query) args = {'transaction_id': qs['transaction_id'][0], 'signed': True, 'ok': True, 'reason': 'Paid'} # make sure return url is the user return URL assert urlparse.urlparse(qs['return_url'][0]).path.startswith( reverse('lingo-return-payment-backend', kwargs={'payment_backend_pk': regie.payment_backend.id})) # simulate successful call to callback URL with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: resp = app.get(get_url(with_payment_backend, 'lingo-callback', regie), params=args) assert resp.status_code == 200 # simulate successful return URL resp = app.get(qs['return_url'][0], params=args) assert resp.status_code == 302 assert urlparse.urlparse(resp.url).path == '/test_basket_cell/' resp = resp.follow() assert 'Your payment has been succesfully registered.' in resp.text def test_add_amount_to_basket(app, key, regie, user): payment_backend = PaymentBackend.objects.create( label='test2', slug='test2', service='dummy', service_options={'siret': '1234'}) other_regie = Regie(label='test2', slug='test2', payment_backend=payment_backend) other_regie.save() user_email = 'foo@example.com' User.objects.get_or_create(email=user_email) amount = 42 data = {'amount': amount, 'display_name': 'test amount', 'url': 'http://example.com'} url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email) url = sign_url(url, key) resp = app.post_json(url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=amount).exists() assert BasketItem.objects.filter(amount=amount)[0].regie_id == regie.id data['extra'] = {'amount': '22.22'} url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email) url = sign_url(url, key) resp = app.post_json(url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=Decimal('64.22')).exists() data['amount'] = [amount] data['extra'] = {'amount': ['22.22', '12']} url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email) url = sign_url(url, key) resp = app.post_json('%s&amount=5' % url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=Decimal('81.22')).exists() other_regie.is_default = True other_regie.save() data['amount'] = [] data['extra'] = {'amount': '22.23'} url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email) url = sign_url(url, settings.LINGO_API_SIGN_KEY) resp = app.post_json(url, params=data) item = BasketItem.objects.get(amount=Decimal('22.23')) assert resp.status_code == 200 response = json.loads(resp.content) assert response['result'] == 'success' assert response['payment_url'].endswith('/lingo/item/%s/pay' % item.id) assert BasketItem.objects.filter(amount=Decimal('22.23')).exists() assert BasketItem.objects.filter(amount=Decimal('22.23'))[0].regie_id == other_regie.id url = '%s?email=%s®ie_id=%s' % ( reverse('api-add-basket-item'), user_email, regie.id) data['extra'] = {'amount': '22.24', 'foo': 'bar'} url = sign_url(url, settings.LINGO_API_SIGN_KEY) resp = app.post_json(url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=Decimal('22.24')).exists() assert BasketItem.objects.filter(amount=Decimal('22.24'))[0].regie_id == regie.id assert BasketItem.objects.filter(amount=Decimal('22.24'))[0].request_data == data['extra'] url = '%s?email=%s®ie_id=%s' % ( reverse('api-add-basket-item'), user_email, regie.slug) data['extra'] = {'amount': '13.67'} url = sign_url(url, settings.LINGO_API_SIGN_KEY) resp = app.post_json(url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=Decimal('13.67')).exists() assert BasketItem.objects.filter(amount=Decimal('13.67'))[0].regie_id == regie.id url = '%s?email=%s&orig=wcs®ie_id=%s' % (reverse('api-add-basket-item'), user_email, 'scarecrow') url = sign_url(url, key) resp = app.post_json(url, params=data, status=400) assert resp.text == 'Unknown regie' def test_basket_item_with_capture_date(app, user, regie, basket_page, monkeypatch): User.objects.get_or_create(email=user.email) url = '%s?email=%s' % (reverse('api-add-basket-item'), user.email) capture_date = timezone.now().date() data = { 'amount': 10, 'capture_date': capture_date.isoformat(), 'display_name': 'test item' } url = sign_url(url, settings.LINGO_API_SIGN_KEY) resp = app.post_json(url, params=data) assert resp.status_code == 200 assert BasketItem.objects.all()[0].capture_date == capture_date resp = login(app).get('/test_basket_cell/') import eopayment eopayment_mock = mock.Mock( return_value=('orderid', eopayment.URL, 'http://dummy-payment.demo.entrouvert.com/')) monkeypatch.setattr(eopayment.Payment, 'request', eopayment_mock) resp = resp.form.submit() assert resp.status_code == 302 location = urlparse.urlparse(resp.location) assert location.path == '/' assert location.hostname == 'dummy-payment.demo.entrouvert.com' eopayment_mock.assert_called_once_with( Decimal(10), email=user.email, first_name=user.first_name, last_name=user.last_name, capture_date=capture_date) @pytest.mark.parametrize("invalid_capture_date", [8, '', 'not-a-date']) def test_add_basket_capture_date_format(app, user, regie, invalid_capture_date): url = '%s?email=%s' % (reverse('api-add-basket-item'), user.email) data = {'amount': 10, 'display_name': 'test item'} data['capture_date'] = invalid_capture_date url = sign_url(url, settings.LINGO_API_SIGN_KEY) resp = app.post_json(url, params=data, status=400) assert 'Bad format for capture date, it should be yyyy-mm-dd.' in resp.content def test_cant_pay_if_different_capture_date(app, basket_page, regie, user): capture1 = (timezone.now() + timedelta(days=1)).date() capture2 = (timezone.now() + timedelta(days=2)).date() items = { 'item1': { 'amount': '10.5', 'capture_date': capture1.isoformat(), 'source_url': 'http://example.org/item/1' }, 'item2': { 'amount': '42', 'capture_date': capture2.isoformat(), 'source_url': 'http://example.org/item/2'}, } b_items = [] for subject, details in items.items(): b_item = BasketItem.objects.create( user=user, regie=regie, subject=subject, **details) b_items.append(b_item.pk) resp = login(app).get('/test_basket_cell/') resp = resp.form.submit() assert resp.status_code == 302 assert urlparse.urlparse(resp.location).path == '/test_basket_cell/' resp = resp.follow() assert "Invalid grouping for basket items: different capture dates." in resp.content def test_pay_single_basket_item(app, key, regie, user, john_doe): page = Page(title='xxx', slug='index', template_name='standard') page.save() cell = LingoBasketCell(page=page, placeholder='content', order=0) cell.save() amount = 12 data = {'amount': amount, 'display_name': 'test amount', 'url': 'http://example.com'} url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user.email) url = sign_url(url, key) resp = app.post_json(url, params=data) # check that an unpaid item exists in basket assert BasketItem.objects.filter(regie=regie, amount=amount, payment_date__isnull=True).exists() payment_url = resp.json['payment_url'] resp = app.get(payment_url, status=403) assert 'No item payment allowed for anonymous users.' in resp.text login(app, username='john.doe', password='john.doe') resp = app.get(payment_url, status=403) assert 'Wrong item: payment not allowed.' in resp.text # forbid payment to regie with extra_fees_ws_url regie.extra_fees_ws_url = 'http://example.com/extra-fees' regie.save() app.reset() login(app) resp = app.get(payment_url, status=403) assert 'No item payment allowed as extra fees set.' in resp.text regie.extra_fees_ws_url = '' regie.save() resp = app.get(payment_url, params={'next_url': 'http://example.net/form/id/'}) # make sure the redirection is done to the payment backend assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/') qs = urlparse.parse_qs(urlparse.urlparse(resp.location).query) assert qs['amount'] == ['12.00'] # simulate successful payment response from dummy backend data = {'transaction_id': qs['transaction_id'][0], 'ok': True, 'amount': qs['amount'][0], 'signed': True} # simulate payment service redirecting the user to /lingo/return/... (eopayment # dummy module put that URL in return_url query string parameter). resp = app.get(qs['return_url'][0], params=data) # check that item is paid assert BasketItem.objects.filter(regie=regie, amount=amount, payment_date__isnull=False).exists() # check that user is redirected to the next_url passed previously assert resp.location == 'http://example.net/form/id/' def test_pay_multiple_regies(app, key, regie, user): test_add_amount_to_basket(app, key, regie, user) page = Page(title='xxx', slug='test_basket_cell', template_name='standard') page.save() cell = LingoBasketCell(page=page, placeholder='content', order=0) cell.save() resp = login(app).get(page.get_online_url()) resp = resp.forms[0].submit() assert resp.location.startswith('http://dummy-payment.demo.entrouvert.com/') qs = urlparse.parse_qs(urlparse.urlparse(resp.location).query) assert qs['amount'] == ['223.35'] resp = login(app).get(page.get_online_url()) resp = resp.forms[1].submit() qs = urlparse.parse_qs(urlparse.urlparse(resp.location).query) assert qs['amount'] == ['22.23'] def test_pay_as_anonymous_user(app, key, regie, user): test_add_amount_to_basket(app, key, regie, user) page = Page(title='xxx', slug='test_basket_cell', template_name='standard') page.save() cell = LingoBasketCell(page=page, placeholder='content', order=0) cell.save() resp = login(app).get(page.get_online_url()) app.cookiejar.clear(domain='testserver.local', path='/', name='sessionid') resp = resp.forms[0].submit().follow() assert 'Payment requires to be logged in.' in resp.text def test_cancel_basket_item(app, key, regie, user): user_email = 'foo@example.com' User.objects.get_or_create(email=user_email) url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email) url = sign_url(url, key) data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.com/', 'notify': 'true'} resp = app.post_json(url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists() basket_item_id = json.loads(resp.content)['id'] data = {'amount': 21, 'display_name': 'test amount', 'url': 'http://example.net/'} resp = app.post_json(url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists() assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists() basket_item_id_2 = json.loads(resp.content)['id'] with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: url = '%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email) url = sign_url(url, key) data = {'basket_item_id': basket_item_id, 'notify': 'true'} resp = app.post_json(url, params=data) assert request.call_args[0] == ('POST', u'http://example.com/jump/trigger/cancelled') assert not BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists() assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists() with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: url = '%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email) url = sign_url(url, key) data = {'basket_item_id': basket_item_id_2} resp = app.post_json(url, params=data) assert request.call_count == 0 assert not BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists() assert not BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists() def test_cancel_basket_item_from_cell(app, key, regie, user): page = Page(title='xxx', slug='test_basket_cell', template_name='standard') page.save() cell = LingoBasketCell(page=page, placeholder='content', order=0) cell.save() url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user.email) url = sign_url(url, key) data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'} resp = app.post_json(url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists() basket_item_id = json.loads(resp.content)['id'] # check while not logged in resp = app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}), status=404) assert BasketItem.objects.filter(id=basket_item_id).exists() # check a successful case app = login(app) with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: resp = app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id})) resp = resp.form.submit() url = request.call_args[0][1] assert url.startswith('http://example.org/testitem/jump/trigger/cancelled') assert BasketItem.objects.filter(id=basket_item_id, cancellation_date__isnull=False).exists() # check removal of an item that is not cancellable url = '%s?email=%s&cancellable=no&orig=wcs' % (reverse('api-add-basket-item'), user.email) url = sign_url(url, key) data = {'amount': 21, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'} resp = app.post_json(url, params=data) basket_item2_id = json.loads(resp.content)['id'] assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists() resp = app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item2_id})) resp = resp.form.submit() resp = resp.follow() assert 'This item cannot be removed.' in resp.text # check removal of the item of another user user_email = 'bar@example.com' User.objects.get_or_create(email=user_email) url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email) url = sign_url(url, key) data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'} resp = app.post_json(url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' basket_item_id = json.loads(resp.content)['id'] app.get(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}), status=404) app.post(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}), status=403) @pytest.mark.parametrize('with_payment_backend', [False, True]) def test_payment_callback(app, basket_page, regie, user, with_payment_backend): page = Page(title='xxx', slug='index', template_name='standard') page.save() item = BasketItem.objects.create(user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/testitem/') resp = login(app).get(basket_page.get_online_url()) resp = resp.form.submit() assert resp.status_code == 302 location = resp.location parsed = urlparse.urlparse(location) qs = urlparse.parse_qs(parsed.query) transaction_id = qs['transaction_id'][0] data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True} assert data['amount'] == '10.50' # call callback with GET callback_url = get_url(with_payment_backend, 'lingo-callback', regie) with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: get_resp = app.get(callback_url, params=data) url = request.call_args[0][1] assert url.startswith('http://example.org/testitem/jump/trigger/paid') assert get_resp.status_code == 200 assert Transaction.objects.get(order_id=transaction_id).status == 3 item = BasketItem.objects.create(user=user, regie=regie, subject='test_item', amount='11.5', source_url='http://example.org/testitem/') resp = login(app).get(basket_page.get_online_url()) resp = resp.form.submit() assert resp.status_code == 302 location = resp.location parsed = urlparse.urlparse(location) qs = urlparse.parse_qs(parsed.query) transaction_id = qs['transaction_id'][0] data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True} assert data['amount'] == '11.50' # call callback with POST with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: post_resp = app.post(callback_url, params=data) assert post_resp.status_code == 200 assert Transaction.objects.get(order_id=transaction_id).status == 3 # call return view return_url = get_url(with_payment_backend, 'lingo-return', regie) get_resp = app.get(return_url, params=data) assert get_resp.status_code == 302 resp = app.get(get_resp['Location']) assert 'Your payment has been succesfully registered.' in resp.text @pytest.mark.parametrize('with_payment_backend', [False, True]) def test_payment_callback_no_regie(app, basket_page, regie, user, with_payment_backend): item = BasketItem.objects.create(user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/testitem/') resp = login(app).get(basket_page.get_online_url()) resp = resp.form.submit() assert resp.status_code == 302 location = resp.location parsed = urlparse.urlparse(location) qs = urlparse.parse_qs(parsed.query) transaction_id = qs['transaction_id'][0] data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True} assert data['amount'] == '10.50' # call callback with GET callback_url = get_url(with_payment_backend, 'lingo-callback', regie) with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: get_resp = app.get(callback_url, params=data) url = request.call_args[0][1] assert url.startswith('http://example.org/testitem/jump/trigger/paid') assert get_resp.status_code == 200 assert Transaction.objects.get(order_id=transaction_id).status == 3 item = BasketItem.objects.create(user=user, regie=regie, subject='test_item', amount='11.5', source_url='http://example.org/testitem/') resp = login(app).get(basket_page.get_online_url()) resp = resp.form.submit() assert resp.status_code == 302 location = resp.location parsed = urlparse.urlparse(location) qs = urlparse.parse_qs(parsed.query) transaction_id = qs['transaction_id'][0] data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True} assert data['amount'] == '11.50' @pytest.mark.parametrize('with_payment_backend', [False, True]) def test_nonexisting_transaction(app, regie, user, with_payment_backend): app = login(app) data = {'transaction_id': 'unknown', 'signed': True, 'amount': '23', 'ok': True} # call callback with GET callback_url = get_url(with_payment_backend, 'lingo-callback', regie) app.get(callback_url, params=data, status=404) @pytest.mark.parametrize('with_payment_backend', [False, True]) def test_payment_callback_waiting(app, basket_page, regie, user, with_payment_backend): item = BasketItem.objects.create(user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/testitem/') resp = login(app).get(basket_page.get_online_url()) resp = resp.form.submit() assert resp.status_code == 302 location = resp.location parsed = urlparse.urlparse(location) qs = urlparse.parse_qs(parsed.query) transaction_id = qs['transaction_id'][0] data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'waiting': True} assert data['amount'] == '10.50' # callback with WAITING state callback_url = get_url(with_payment_backend, 'lingo-callback', regie) resp = app.get(callback_url, params=data) assert resp.status_code == 200 assert Transaction.objects.get(order_id=transaction_id).status == eopayment.WAITING assert BasketItem.objects.get(id=item.id).waiting_date assert not BasketItem.objects.get(id=item.id).payment_date assert BasketItem.get_items_to_be_paid(user).count() == 0 # callback with PAID state data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True} with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: resp = app.get(callback_url, params=data) assert resp.status_code == 200 url = request.call_args[0][1] assert url.startswith('http://example.org/testitem/jump/trigger/paid') assert Transaction.objects.get(order_id=transaction_id).status == eopayment.PAID assert BasketItem.objects.get(id=item.id).payment_date assert BasketItem.get_items_to_be_paid(user).count() == 0 @pytest.mark.parametrize('with_payment_backend', [False, True]) def test_payment_no_callback_just_return( caplog, app, basket_page, regie, user, with_payment_backend): item = BasketItem.objects.create(user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/testitem/') resp = login(app).get(basket_page.get_online_url()) resp = resp.form.submit() assert resp.status_code == 302 location = resp.location parsed = urlparse.urlparse(location) qs = urlparse.parse_qs(parsed.query) transaction_id = qs['transaction_id'][0] data = {'transaction_id': transaction_id, 'amount': qs['amount'][0], 'ok': True} assert data['amount'] == '10.50' # call return with unsigned POST with check_log(caplog, 'received unsigned payment'): return_url = get_url(with_payment_backend, 'lingo-return', regie) with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: get_resp = app.post(return_url, params=data) assert request.call_count == 0 assert get_resp.status_code == 302 assert urlparse.urlparse(get_resp['location']).path == '/test_basket_cell/' assert Transaction.objects.get(order_id=transaction_id).status == 0 # not paid # call return with missing data with check_log(caplog, 'failed to process payment response: missing transaction_id'): baddata = data.copy() del baddata['transaction_id'] with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: get_resp = app.post(return_url, params=baddata) assert get_resp.status_code == 302 resp = app.get(get_resp['Location']) assert 'Your payment has been succesfully registered.' not in resp.text assert 'the payment service failed to provide a correct answer.' in resp.text assert Transaction.objects.get(order_id=transaction_id).status == 0 # not paid # call return with signed POST data['signed'] = True return_url = get_url(with_payment_backend, 'lingo-return', regie) with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: get_resp = app.post(return_url, params=data) url = request.call_args[0][1] assert url.startswith('http://example.org/testitem/jump/trigger/paid') assert get_resp.status_code == 302 assert urlparse.urlparse(get_resp['location']).path == '/test_basket_cell/' resp = app.get(get_resp['Location']) assert 'Your payment has been succesfully registered.' in resp.text assert Transaction.objects.get(order_id=transaction_id).status == eopayment.PAID def test_transaction_expiration(): t1 = Transaction(status=0) t1.save() t1.start_date = timezone.now() - timedelta(hours=2) t1.save() t2 = Transaction(status=0) t2.save() appconfig = apps.get_app_config('lingo') appconfig.update_transactions() assert Transaction.objects.get(id=t1.id).status == EXPIRED assert Transaction.objects.get(id=t2.id).status == 0 def test_transaction_validate(app, key, regie, user): t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12, status=eopayment.PAID) t1.save() url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0' resp = app.post_json(url, params={}, status=403) url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0&orig=wcs' url = sign_url(url, key) resp = app.post_json(url, params={}, status=404) url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id url = sign_url(url, key) resp = app.post_json(url, params={}) assert json.loads(resp.content)['err'] == 0 operations = TransactionOperation.objects.filter(transaction=t1) assert len(operations) == 1 assert operations[0].amount == 10 with mock.patch.object(eopayment.dummy.Payment, 'validate', autospec=True) as mock_validate: mock_validate.side_effect = eopayment.ResponseError url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id url = sign_url(url, key) resp = app.post_json(url, params={}) assert json.loads(resp.content)['err'] == 1 assert TransactionOperation.objects.filter(transaction=t1).count() == 1 def test_transaction_cancel(app, key, regie, user): t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12, status=eopayment.PAID) t1.save() url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs' resp = app.post_json(url, params={}, status=403) url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs' url = sign_url(url, key) resp = app.post(url, params={}, status=404) url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id url = sign_url(url, key) resp = app.post_json(url, params={}) assert json.loads(resp.content)['err'] == 0 operations = TransactionOperation.objects.filter(transaction=t1) assert len(operations) == 1 assert operations[0].amount == 10 with mock.patch.object(eopayment.dummy.Payment, 'cancel', autospec=True) as mock_cancel: mock_cancel.side_effect = eopayment.ResponseError url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id url = sign_url(url, key) resp = app.post_json(url, params={}) assert json.loads(resp.content)['err'] == 1 assert TransactionOperation.objects.filter(transaction=t1).count() == 1 def test_extra_fees(app, basket_page, key, regie, user): regie.extra_fees_ws_url = 'http://www.example.net/extra-fees' regie.save() user_email = 'foo@example.com' User.objects.get_or_create(email=user_email) amount = 42 data = {'amount': amount, 'display_name': 'test amount'} with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: mock_json = mock.Mock() mock_json.status_code = 200 mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '5'}]} request.return_value = mock_json url = sign_url('%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email), key) resp = app.post_json(url, params=data) assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert BasketItem.objects.filter(amount=amount).exists() assert BasketItem.objects.filter(amount=amount)[0].regie_id == regie.id assert BasketItem.objects.filter(amount=5, extra_fee=True).exists() assert BasketItem.objects.filter(amount=5, extra_fee=True)[0].regie_id == regie.id with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: mock_json = mock.Mock() mock_json.status_code = 200 mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '7'}]} request.return_value = mock_json data['amount'] = 43 url = sign_url('%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email), key) resp = app.post_json(url, params=data) assert request.call_args[0] == ('POST', 'http://www.example.net/extra-fees') assert len(json.loads(request.call_args[1]['data'])['data']) == 2 assert resp.status_code == 200 assert json.loads(resp.content)['result'] == 'success' assert not BasketItem.objects.filter(amount=5, extra_fee=True).exists() assert BasketItem.objects.filter(amount=7, extra_fee=True).exists() with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: mock_json = mock.Mock() mock_json.status_code = 200 mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '4'}]} request.return_value = mock_json url = sign_url('%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email), key) data = {'basket_item_id': BasketItem.objects.get(amount=43).id} resp = app.post_json(url, params=data) assert resp.status_code == 200 assert not BasketItem.objects.filter(amount=7, extra_fee=True).exists() assert BasketItem.objects.filter(amount=4, extra_fee=True).exists() # test payment app = login(app) with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: mock_json = mock.Mock() mock_json.status_code = 200 mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '2'}]} request.return_value = mock_json resp = login(app).get(basket_page.get_online_url()) resp = resp.form.submit() assert resp.status_code == 302 location = resp.location parsed = urlparse.urlparse(location) qs = urlparse.parse_qs(parsed.query) transaction_id = qs['transaction_id'][0] data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True} assert data['amount'] == '44.00' # test again, without specifying a regie with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: mock_json = mock.Mock() mock_json.status_code = 200 mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '3'}]} request.return_value = mock_json resp = login(app).get(basket_page.get_online_url()) resp = resp.form.submit() assert resp.status_code == 302 location = resp.location parsed = urlparse.urlparse(location) qs = urlparse.parse_qs(parsed.query) transaction_id = qs['transaction_id'][0] data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True} assert data['amount'] == '45.00' # call callback with GET callback_url = reverse('lingo-callback', kwargs={'regie_pk': regie.id}) resp = app.get(callback_url, params=data) assert resp.status_code == 200 assert Transaction.objects.get(order_id=transaction_id).status == 3 @pytest.mark.parametrize('with_payment_backend', [False, True]) def test_payment_callback_error(app, basket_page, regie, user, with_payment_backend): item = BasketItem.objects.create(user=user, regie=regie, subject='test_item', amount='10.5', source_url='http://example.org/testitem/') resp = login(app).get(basket_page.get_online_url()) resp = resp.form.submit() assert resp.status_code == 302 location = resp.location parsed = urlparse.urlparse(location) qs = urlparse.parse_qs(parsed.query) transaction_id = qs['transaction_id'][0] data = {'transaction_id': transaction_id, 'signed': True, 'amount': qs['amount'][0], 'ok': True} assert data['amount'] == '10.50' # call callback with GET callback_url = get_url(with_payment_backend, 'lingo-callback', regie) with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: mock_response = mock.Mock() def kaboom(): raise Exception('kaboom') mock_response.status_code = 500 mock_response.raise_for_status = kaboom request.return_value = mock_response get_resp = app.get(callback_url, params=data) url = request.call_args[0][1] assert url.startswith('http://example.org/testitem/jump/trigger/paid') assert get_resp.status_code == 200 assert Transaction.objects.get(order_id=transaction_id).status == 3 assert BasketItem.objects.get(id=item.id).payment_date assert not BasketItem.objects.get(id=item.id).notification_date # too soon appconfig = apps.get_app_config('lingo') appconfig.notify_payments() assert BasketItem.objects.get(id=item.id).payment_date assert not BasketItem.objects.get(id=item.id).notification_date # fake delay basket_item = BasketItem.objects.get(id=item.id) basket_item.payment_date = timezone.now() - timedelta(hours=1) basket_item.save() with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request: mock_response = mock.Mock() mock_response.status_code = 200 request.return_value = mock_response appconfig.notify_payments() url = request.call_args[0][1] assert url.startswith('http://example.org/testitem/jump/trigger/paid') assert BasketItem.objects.get(id=item.id).payment_date assert BasketItem.objects.get(id=item.id).notification_date