combo/tests/test_lingo_payment.py

576 lines
26 KiB
Python

import eopayment
import pytest
from datetime import datetime, timedelta
import urlparse
import urllib
from decimal import Decimal
import json
import mock
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.core.wsgi import get_wsgi_application
from django.conf import settings
from django.utils import timezone
from django.contrib.messages.storage.session import SessionStorage
from webtest import TestApp
from django.test import Client
from combo.data.models import Page
from combo.apps.lingo.models import (Regie, BasketItem, Transaction,
TransactionOperation, RemoteItem, EXPIRED, LingoBasketCell)
from combo.apps.lingo.management.commands.update_transactions import Command as UpdateTransactionsCommand
from combo.utils import sign_url
pytestmark = pytest.mark.django_db
client = Client()
@pytest.fixture
def regie():
try:
regie = Regie.objects.get(slug='test')
except Regie.DoesNotExist:
regie = Regie()
regie.label = 'Test'
regie.slug = 'test'
regie.description = 'test'
regie.payment_min_amount = Decimal(4.5)
regie.service = 'dummy'
regie.service_options = {'siret': '1234'}
regie.save()
return regie
@pytest.fixture
def user():
try:
user = User.objects.get(username='admin')
except User.DoesNotExist:
user = User.objects.create_user('admin', password='admin',
email='foo@example.com')
return user
@pytest.fixture(params=['orig', 'sign_key'])
def key(request, settings):
if request.param == 'orig':
key = 'abcde'
settings.KNOWN_SERVICES = {
'wcs': {
'wcs1': {
'url': 'http://example.org/',
'verif_orig': 'wcs',
'secret': key,
},
}
}
return key
else:
return settings.LINGO_API_SIGN_KEY
def login(username='admin', password='admin'):
resp = client.post('/login/', {'username': username, 'password': password})
assert resp.status_code == 302
def test_default_regie():
Regie.objects.all().delete()
regie1 = Regie(label='foo', slug='foo')
regie1.save()
assert bool(regie1.is_default) is True
regie2 = Regie(label='bar', slug='bar')
regie2.save()
assert bool(regie2.is_default) is False
regie2.is_default = True
regie2.save()
regie2 = Regie.objects.get(id=regie2.id)
assert bool(regie2.is_default) is True
regie1 = Regie.objects.get(id=regie1.id)
assert bool(regie1.is_default) is False
def test_regie_api():
resp = client.get(reverse('api-regies'))
assert len(json.loads(resp.content).get('data')) == 0
test_default_regie()
resp = client.get(reverse('api-regies'))
assert len(json.loads(resp.content).get('data')) == 2
assert json.loads(resp.content).get('data')[0]['id'] == Regie.objects.get(is_default=True).slug
def test_payment_min_amount(regie, user):
items = {'item1': {'amount': '1.5', 'source_url': '/item/1'},
'item2': {'amount': '2.4', 'source_url': '/item/2'}
}
b_items = []
for subject, details in items.iteritems():
b_item = BasketItem.objects.create(user=user, regie=regie,
subject=subject, **details)
b_items.append(b_item.pk)
login()
resp = client.post(reverse('lingo-pay'), {'regie': regie.pk})
assert resp.status_code == 302
def test_successfull_items_payment(regie, user):
items = {'item1': {'amount': '10.5', 'source_url': 'http://example.org/item/1'},
'item2': {'amount': '42', 'source_url': 'http://example.org/item/2'},
'item3': {'amount': '100', 'source_url': 'http://example.org/item/3'},
'item4': {'amount': '354', 'source_url': 'http://example.org/item/4'}
}
b_items = []
for subject, details in items.iteritems():
b_item = BasketItem.objects.create(user=user, regie=regie,
subject=subject, **details)
b_items.append(b_item.pk)
login()
resp = client.post(reverse('lingo-pay'), {'regie': regie.pk})
assert resp.status_code == 302
location = resp.get('location')
assert 'dummy-payment' in location
parsed = urlparse.urlparse(location)
# get return_url and transaction id from location
qs = urlparse.parse_qs(parsed.query)
args = {'transaction_id': qs['transaction_id'][0], 'signed': True,
'ok': True, 'reason': 'Paid'}
# make sure return url is the user return URL
assert urlparse.urlparse(qs['return_url'][0]).path.startswith(
reverse('lingo-return', kwargs={'regie_pk': regie.id}))
# simulate successful return URL
resp = client.get(qs['return_url'][0], args)
assert resp.status_code == 302
assert urlparse.urlparse(resp.url).path == '/'
# simulate successful call to callback URL
with mock.patch('combo.utils.RequestsSession.request') as request:
resp = client.get(reverse('lingo-callback', kwargs={'regie_pk': regie.id}), args)
assert resp.status_code == 200
def test_add_amount_to_basket(key, regie, user):
other_regie = Regie(label='test2', slug='test2')
other_regie.save()
user_email = 'foo@example.com'
User.objects.get_or_create(email=user_email)
amount = 42
data = {'amount': amount, 'display_name': 'test amount',
'url': 'http://example.com'}
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
url = sign_url(url, key)
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=amount).exists()
assert BasketItem.objects.filter(amount=amount)[0].regie_id == regie.id
data['extra'] = {'amount': '22.22'}
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
url = sign_url(url, key)
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('64.22')).exists()
data['amount'] = [amount]
data['extra'] = {'amount': ['22.22', '12']}
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
url = sign_url(url, key)
resp = client.post('%s&amount=5' % url, json.dumps(data),
content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('81.22')).exists()
other_regie.is_default = True
other_regie.save()
data['amount'] = []
data['extra'] = {'amount': '22.23'}
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('22.23')).exists()
assert BasketItem.objects.filter(amount=Decimal('22.23'))[0].regie_id == other_regie.id
url = '%s?email=%s&regie_id=%s' % (
reverse('api-add-basket-item'), user_email, regie.id)
data['extra'] = {'amount': '22.24', 'foo': 'bar'}
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('22.24')).exists()
assert BasketItem.objects.filter(amount=Decimal('22.24'))[0].regie_id == regie.id
assert BasketItem.objects.filter(amount=Decimal('22.24'))[0].request_data == data['extra']
url = '%s?email=%s&regie_id=%s' % (
reverse('api-add-basket-item'), user_email, regie.slug)
data['extra'] = {'amount': '13.67'}
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=Decimal('13.67')).exists()
assert BasketItem.objects.filter(amount=Decimal('13.67'))[0].regie_id == regie.id
url = '%s?email=%s&orig=wcs&regie_id=%s' % (reverse('api-add-basket-item'), user_email, 'scarecrow')
url = sign_url(url, key)
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 400
assert resp.content == 'Unknown regie'
def test_cancel_basket_item(key, regie, user):
user_email = 'foo@example.com'
User.objects.get_or_create(email=user_email)
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
url = sign_url(url, key)
data = {'amount': 42, 'display_name': 'test amount', 'url':
'http://example.com/', 'notify': 'true'}
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
basket_item_id = json.loads(resp.content)['id']
data = {'amount': 21, 'display_name': 'test amount', 'url': 'http://example.net/'}
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
basket_item_id_2 = json.loads(resp.content)['id']
with mock.patch('combo.utils.RequestsSession.request') as request:
url = '%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email)
url = sign_url(url, key)
data = {'basket_item_id': basket_item_id, 'notify': 'true'}
resp = client.post(url, json.dumps(data), content_type='application/json')
assert request.call_args[0] == ('POST', u'http://example.com/jump/trigger/cancelled')
assert not BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
with mock.patch('combo.utils.RequestsSession.request') as request:
url = '%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email)
url = sign_url(url, key)
data = {'basket_item_id': basket_item_id_2}
resp = client.post(url, json.dumps(data), content_type='application/json')
assert request.call_count == 0
assert not BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
assert not BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
def test_cancel_basket_item_from_cell(key, regie, user):
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
page.save()
cell = LingoBasketCell(page=page, placeholder='content', order=0)
cell.save()
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user.email)
url = sign_url(url, key)
data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'}
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
basket_item_id = json.loads(resp.content)['id']
# check while not logged in
client.post(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}))
assert BasketItem.objects.filter(id=basket_item_id).exists()
assert (SessionStorage(client).deserialize_messages(client.session['_messages'])[-1].message
== 'An error occured when removing the item. (no authenticated user)')
# check a successful case
login()
with mock.patch('combo.utils.RequestsSession.request') as request:
client.post(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}))
url = request.call_args[0][1]
assert url.startswith('http://example.org/testitem/jump/trigger/cancelled')
assert BasketItem.objects.filter(id=basket_item_id, cancellation_date__isnull=False).exists()
# check removal of an item that is not cancellable
url = '%s?email=%s&cancellable=no&orig=wcs' % (reverse('api-add-basket-item'), user.email)
url = sign_url(url, key)
data = {'amount': 21, 'display_name': 'test amount',
'url': 'http://example.org/testitem/'}
resp = client.post(url, json.dumps(data), content_type='application/json')
basket_item2_id = json.loads(resp.content)['id']
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
resp = client.post(reverse('lingo-cancel-item', kwargs={'pk': basket_item2_id}))
assert (SessionStorage(client).deserialize_messages(client.session['_messages'])[-1].message
== 'This item cannot be removed.')
# check removal of the item of another user
user_email = 'bar@example.com'
User.objects.get_or_create(email=user_email)
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
url = sign_url(url, key)
data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.org/testitem/'}
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
basket_item_id = json.loads(resp.content)['id']
resp = client.post(reverse('lingo-cancel-item', kwargs={'pk': basket_item_id}))
assert resp.status_code == 404
def test_payment_callback(regie, user):
item = BasketItem.objects.create(user=user, regie=regie,
subject='test_item', amount='10.5',
source_url='http://example.org/testitem/')
login()
resp = client.post(reverse('lingo-pay'), {'regie': regie.pk})
assert resp.status_code == 302
location = resp.get('location')
parsed = urlparse.urlparse(location)
qs = urlparse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True,
'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '10.50'
# call callback with GET
callback_url = reverse('lingo-callback', kwargs={'regie_pk': regie.id})
with mock.patch('combo.utils.RequestsSession.request') as request:
get_resp = client.get(callback_url, data)
url = request.call_args[0][1]
assert url.startswith('http://example.org/testitem/jump/trigger/paid')
assert get_resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == 3
item = BasketItem.objects.create(user=user, regie=regie,
subject='test_item', amount='11.5',
source_url='http://example.org/testitem/')
resp = client.post(reverse('lingo-pay'), {'regie': regie.pk})
assert resp.status_code == 302
location = resp.get('location')
parsed = urlparse.urlparse(location)
qs = urlparse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True,
'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '11.50'
# call callback with POST
with mock.patch('combo.utils.RequestsSession.request') as request:
post_resp = client.post(callback_url, urllib.urlencode(data),
content_type='text/html')
assert post_resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == 3
# call return view
get_resp = client.get(reverse('lingo-return', kwargs={'regie_pk': regie.pk}), data)
assert get_resp.status_code == 302
def test_payment_callback_no_regie(regie, user):
item = BasketItem.objects.create(user=user, regie=regie,
subject='test_item', amount='10.5',
source_url='http://example.org/testitem/')
login()
resp = client.post(reverse('lingo-pay'))
assert resp.status_code == 302
location = resp.get('location')
parsed = urlparse.urlparse(location)
qs = urlparse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True,
'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '10.50'
# call callback with GET
callback_url = reverse('lingo-callback', kwargs={'regie_pk': regie.id})
with mock.patch('combo.utils.RequestsSession.request') as request:
get_resp = client.get(callback_url, data)
url = request.call_args[0][1]
assert url.startswith('http://example.org/testitem/jump/trigger/paid')
assert get_resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == 3
item = BasketItem.objects.create(user=user, regie=regie,
subject='test_item', amount='11.5',
source_url='http://example.org/testitem/')
resp = client.post(reverse('lingo-pay'))
assert resp.status_code == 302
location = resp.get('location')
parsed = urlparse.urlparse(location)
qs = urlparse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True,
'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '11.50'
def test_nonexisting_transaction(regie, user):
login()
data = {'transaction_id': 'unknown', 'signed': True,
'amount': '23', 'ok': True}
# call callback with GET
callback_url = reverse('lingo-callback', kwargs={'regie_pk': regie.id})
get_resp = client.get(callback_url, data)
assert get_resp.status_code == 404
def test_transaction_expiration():
t1 = Transaction(status=0)
t1.save()
t1.start_date = timezone.now() - timedelta(hours=2)
t1.save()
t2 = Transaction(status=0)
t2.save()
cmd = UpdateTransactionsCommand()
cmd.handle()
assert Transaction.objects.get(id=t1.id).status == EXPIRED
assert Transaction.objects.get(id=t2.id).status == 0
def test_transaction_validate(key, regie, user):
t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12,
status=eopayment.PAID)
t1.save()
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0'
resp = client.post(url, content_type='application/json')
assert resp.status_code == 403
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0&orig=wcs'
url = sign_url(url, key)
resp = client.post(url, content_type='application/json')
assert resp.status_code == 404
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
url = sign_url(url, key)
resp = client.post(url, content_type='application/json')
assert json.loads(resp.content)['err'] == 0
operations = TransactionOperation.objects.filter(transaction=t1)
assert len(operations) == 1
assert operations[0].amount == 10
with mock.patch.object(eopayment.dummy.Payment, 'validate', autospec=True) as mock_validate:
mock_validate.side_effect = eopayment.ResponseError
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
url = sign_url(url, key)
resp = client.post(url, content_type='application/json')
assert json.loads(resp.content)['err'] == 1
assert TransactionOperation.objects.filter(transaction=t1).count() == 1
def test_transaction_cancel(key, regie, user):
t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12,
status=eopayment.PAID)
t1.save()
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs'
resp = client.post(url, content_type='application/json')
assert resp.status_code == 403
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs'
url = sign_url(url, key)
resp = client.post(url, content_type='application/json')
assert resp.status_code == 404
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
url = sign_url(url, key)
resp = client.post(url, content_type='application/json')
assert json.loads(resp.content)['err'] == 0
operations = TransactionOperation.objects.filter(transaction=t1)
assert len(operations) == 1
assert operations[0].amount == 10
with mock.patch.object(eopayment.dummy.Payment, 'cancel', autospec=True) as mock_cancel:
mock_cancel.side_effect = eopayment.ResponseError
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
url = sign_url(url, key)
resp = client.post(url, content_type='application/json')
assert json.loads(resp.content)['err'] == 1
assert TransactionOperation.objects.filter(transaction=t1).count() == 1
def test_extra_fees(key, regie, user):
regie.extra_fees_ws_url = 'http://www.example.net/extra-fees'
regie.save()
user_email = 'foo@example.com'
User.objects.get_or_create(email=user_email)
amount = 42
data = {'amount': amount, 'display_name': 'test amount'}
with mock.patch('combo.utils.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '5'}]}
request.return_value = mock_json
url = sign_url('%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email), key)
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert BasketItem.objects.filter(amount=amount).exists()
assert BasketItem.objects.filter(amount=amount)[0].regie_id == regie.id
assert BasketItem.objects.filter(amount=5, extra_fee=True).exists()
assert BasketItem.objects.filter(amount=5, extra_fee=True)[0].regie_id == regie.id
with mock.patch('combo.utils.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '7'}]}
request.return_value = mock_json
data['amount'] = 43
url = sign_url('%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email), key)
resp = client.post(url, json.dumps(data), content_type='application/json')
assert request.call_args[0] == ('POST', 'http://www.example.net/extra-fees')
assert len(json.loads(request.call_args[1]['data'])['data']) == 2
assert resp.status_code == 200
assert json.loads(resp.content)['result'] == 'success'
assert not BasketItem.objects.filter(amount=5, extra_fee=True).exists()
assert BasketItem.objects.filter(amount=7, extra_fee=True).exists()
with mock.patch('combo.utils.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '4'}]}
request.return_value = mock_json
url = sign_url('%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email), key)
data = {'basket_item_id': BasketItem.objects.get(amount=43).id}
resp = client.post(url, json.dumps(data), content_type='application/json')
assert resp.status_code == 200
assert not BasketItem.objects.filter(amount=7, extra_fee=True).exists()
assert BasketItem.objects.filter(amount=4, extra_fee=True).exists()
# test payment
login()
with mock.patch('combo.utils.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '2'}]}
request.return_value = mock_json
resp = client.post(reverse('lingo-pay'), {'regie': regie.pk})
assert resp.status_code == 302
location = resp.get('location')
parsed = urlparse.urlparse(location)
qs = urlparse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True,
'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '44.00'
# test again, without specifying a regie
with mock.patch('combo.utils.RequestsSession.request') as request:
mock_json = mock.Mock()
mock_json.status_code = 200
mock_json.json.return_value = {'err': 0, 'data': [{'subject': 'Extra Fees', 'amount': '3'}]}
request.return_value = mock_json
resp = client.post(reverse('lingo-pay'))
assert resp.status_code == 302
location = resp.get('location')
parsed = urlparse.urlparse(location)
qs = urlparse.parse_qs(parsed.query)
transaction_id = qs['transaction_id'][0]
data = {'transaction_id': transaction_id, 'signed': True,
'amount': qs['amount'][0], 'ok': True}
assert data['amount'] == '45.00'
# call callback with GET
callback_url = reverse('lingo-callback', kwargs={'regie_pk': regie.id})
resp = client.get(callback_url, data)
assert resp.status_code == 200
assert Transaction.objects.get(order_id=transaction_id).status == 3