340 lines
13 KiB
Python
340 lines
13 KiB
Python
import json
|
|
import pytest
|
|
import mock
|
|
import urlparse
|
|
from decimal import Decimal
|
|
from requests.exceptions import ConnectionError
|
|
|
|
from django.test.client import RequestFactory
|
|
from django.template import Context
|
|
from django.core.urlresolvers import reverse
|
|
from django.conf import settings
|
|
from django.core.management import call_command
|
|
|
|
from combo.utils import check_query, aes_hex_encrypt
|
|
from combo.data.models import Page
|
|
from combo.apps.lingo.models import (Regie, ActiveItems, ItemsHistory, SelfDeclaredInvoicePayment,
|
|
Transaction, BasketItem)
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
INVOICES = [
|
|
{
|
|
'id': 'F201601',
|
|
'display_id': 'F-2016-One',
|
|
'label': 'invoice-one',
|
|
'regie': 'remote',
|
|
'created': '2016-02-02',
|
|
'pay_limit_date': '2999-12-31',
|
|
'total_amount': '123.45',
|
|
'amount': '123.45',
|
|
'has_pdf': True,
|
|
'online_payment': True,
|
|
'paid': False,
|
|
'payment_date': None,
|
|
'no_online_payment_reason': '',
|
|
},
|
|
]
|
|
|
|
@pytest.fixture
|
|
def remote_regie():
|
|
try:
|
|
regie = Regie.objects.get(slug='remote')
|
|
except Regie.DoesNotExist:
|
|
regie = Regie()
|
|
regie.label = 'Remote'
|
|
regie.slug = 'remote'
|
|
regie.description = 'remote'
|
|
regie.payment_min_amount = Decimal(2.0)
|
|
regie.service = 'dummy'
|
|
regie.service_options = {'siret': '1234'}
|
|
regie.webservice_url = 'http://example.org/regie' # is_remote
|
|
regie.save()
|
|
return regie
|
|
|
|
|
|
class MockUser(object):
|
|
email = 'foo@example.net'
|
|
def is_authenticated(self):
|
|
return True
|
|
|
|
def __init__(self):
|
|
class MockSAMLUsers(object):
|
|
def exists(self):
|
|
return True
|
|
def first(self):
|
|
class MockSAMLUser(object):
|
|
name_id = 'r2d2'
|
|
return MockSAMLUser()
|
|
self.saml_identifiers = MockSAMLUsers()
|
|
|
|
@mock.patch('combo.utils.RequestsSession.request')
|
|
def test_remote_regie_active_invoices_cell(mock_request, remote_regie):
|
|
assert remote_regie.is_remote() == True
|
|
|
|
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
|
|
page.save()
|
|
cell = ActiveItems(regie='remote', page=page, placeholder='content', order=0)
|
|
context = Context({'request': RequestFactory().get('/')})
|
|
context['synchronous'] = True # to get fresh content
|
|
|
|
user = MockUser()
|
|
context['user'] = user
|
|
context['request'].user = user
|
|
|
|
assert cell.is_relevant(context) is True
|
|
|
|
# show regie with an invoice
|
|
ws_invoices = {'err': 0, 'data': INVOICES}
|
|
mock_response = mock.Mock(status_code=200, content=json.dumps(ws_invoices))
|
|
mock_response.json.return_value = ws_invoices
|
|
mock_request.return_value = mock_response
|
|
content = cell.render(context)
|
|
assert 'F-2016-One' in content
|
|
assert '123.45' in content
|
|
|
|
# check if regie webservice has been correctly called
|
|
assert mock_request.call_args[0][0] == 'GET'
|
|
url = mock_request.call_args[0][1]
|
|
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
|
|
assert scheme == 'http'
|
|
assert netloc == 'example.org'
|
|
assert path == '/regie/invoices/'
|
|
query = urlparse.parse_qs(querystring, keep_blank_values=True)
|
|
assert query['NameID'][0] == 'r2d2'
|
|
assert query['orig'][0] == 'combo'
|
|
assert check_query(querystring, 'combo') == True
|
|
|
|
# with no invoice
|
|
ws_invoices = {'err': 0, 'data': []}
|
|
mock_response = mock.Mock(status_code=200, content=json.dumps(ws_invoices))
|
|
mock_response.json.return_value = ws_invoices
|
|
mock_request.return_value = mock_response
|
|
content = cell.render(context)
|
|
assert 'No items yet' in content
|
|
|
|
@mock.patch('combo.utils.RequestsSession.request')
|
|
def test_remote_regie_past_invoices_cell(mock_request, remote_regie):
|
|
assert remote_regie.is_remote() == True
|
|
|
|
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
|
|
page.save()
|
|
cell = ItemsHistory(regie='remote', page=page, placeholder='content', order=0)
|
|
context = Context({'request': RequestFactory().get('/')})
|
|
context['synchronous'] = True # to get fresh content
|
|
|
|
user = MockUser()
|
|
context['user'] = user
|
|
context['request'].user = user
|
|
|
|
assert cell.is_relevant(context) is True
|
|
|
|
# show regie with an invoice
|
|
ws_invoices = {'err': 0, 'data': INVOICES}
|
|
mock_response = mock.Mock(status_code=200, content=json.dumps(ws_invoices))
|
|
mock_response.json.return_value = ws_invoices
|
|
mock_request.return_value = mock_response
|
|
content = cell.render(context)
|
|
assert 'F-2016-One' in content
|
|
assert '123.45' in content
|
|
|
|
# check if regie webservice has been correctly called
|
|
assert mock_request.call_args[0][0] == 'GET'
|
|
url = mock_request.call_args[0][1]
|
|
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
|
|
assert scheme == 'http'
|
|
assert netloc == 'example.org'
|
|
assert path == '/regie/invoices/history/'
|
|
query = urlparse.parse_qs(querystring, keep_blank_values=True)
|
|
assert query['NameID'][0] == 'r2d2'
|
|
assert query['orig'][0] == 'combo'
|
|
assert check_query(querystring, 'combo') == True
|
|
|
|
# with no invoice
|
|
ws_invoices = {'err': 0, 'data': []}
|
|
mock_response = mock.Mock(status_code=200, content=json.dumps(ws_invoices))
|
|
mock_response.json.return_value = ws_invoices
|
|
mock_request.return_value = mock_response
|
|
content = cell.render(context)
|
|
assert 'No items yet' in content
|
|
|
|
@mock.patch('combo.apps.lingo.models.Regie.pay_invoice')
|
|
@mock.patch('combo.apps.lingo.models.requests.get')
|
|
def test_anonymous_successful_item_payment(mock_get, mock_pay_invoice, app, remote_regie):
|
|
assert remote_regie.is_remote() == True
|
|
encrypt_id = aes_hex_encrypt(settings.SECRET_KEY, 'F201601')
|
|
mock_json = mock.Mock()
|
|
mock_json.json.return_value = {'err': 0, 'data': INVOICES[0]}
|
|
mock_get.return_value = mock_json
|
|
mock_pay_invoice.return_value = mock.Mock(status_code=200)
|
|
resp = app.get('/lingo/item/%s/%s/' % (remote_regie.id, encrypt_id))
|
|
form = resp.form
|
|
|
|
assert 'email' in form.fields
|
|
assert form['email'].value == ''
|
|
assert 'item_url' in form.fields
|
|
assert form['item_url'].value == '/lingo/item/%s/%s/' % (remote_regie.id, encrypt_id)
|
|
assert 'item' in form.fields
|
|
assert form['item'].value == 'F201601'
|
|
assert 'regie' in form.fields
|
|
assert form['regie'].value == unicode(remote_regie.pk)
|
|
|
|
form['email'] = 'ghost@buster.com'
|
|
resp = form.submit()
|
|
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
assert 'dummy-payment' in location
|
|
parsed = urlparse.urlparse(location)
|
|
# get return_url and transaction id from location
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
args = {'transaction_id': qs['transaction_id'][0], 'signed': True,
|
|
'ok': True, 'reason': 'Paid'}
|
|
# make sure return url is the user return URL
|
|
assert urlparse.urlparse(qs['return_url'][0]).path.startswith(
|
|
reverse('lingo-return', kwargs={'regie_pk': remote_regie.id}))
|
|
# simulate successful return URL
|
|
resp = app.get(qs['return_url'][0], params=args)
|
|
assert resp.status_code == 302
|
|
assert urlparse.urlparse(resp.url).path == '/'
|
|
# simulate successful call to callback URL
|
|
resp = app.get(reverse('lingo-callback', kwargs={'regie_pk': remote_regie.id}), params=args)
|
|
trans = Transaction.objects.all()
|
|
b_item = BasketItem.objects.all()
|
|
|
|
assert trans
|
|
assert b_item
|
|
trans = trans[0]
|
|
b_item = b_item[0]
|
|
assert b_item.subject == 'Invoice #%s' % INVOICES[0]['display_id']
|
|
assert b_item.amount == Decimal(INVOICES[0]['amount'])
|
|
assert b_item in trans.items.all()
|
|
|
|
assert resp.status_code == 200
|
|
|
|
@mock.patch('combo.apps.lingo.models.requests.get')
|
|
def test_anonymous_item_payment_email_error(mock_get, app, remote_regie):
|
|
assert remote_regie.is_remote() == True
|
|
encrypt_id = aes_hex_encrypt(settings.SECRET_KEY, 'F201601')
|
|
mock_json = mock.Mock()
|
|
mock_json.json.return_value = {'err': 0, 'data': INVOICES[0]}
|
|
mock_get.return_value = mock_json
|
|
resp = app.get('/lingo/item/%s/%s/' % (remote_regie.id, encrypt_id))
|
|
form = resp.form
|
|
resp = form.submit()
|
|
|
|
assert resp.status_code == 302
|
|
path = urlparse.urlparse(resp.location).path
|
|
assert path == '/lingo/item/%s/%s/' % (remote_regie.id, encrypt_id)
|
|
|
|
@mock.patch('combo.apps.lingo.models.requests.get')
|
|
def test_wrong_crypted_item(mock_get, remote_regie, app):
|
|
assert remote_regie.is_remote() == True
|
|
mock_json = mock.Mock()
|
|
mock_json.json.return_value = {'err': 0, 'data': INVOICES[0]}
|
|
mock_get.return_value = mock_json
|
|
resp = app.get('/lingo/item/%s/%s/' % (remote_regie.id, 'zrzer854sfaear45e6rzerzerzef'), status=404)
|
|
|
|
@mock.patch('combo.apps.lingo.models.requests.get')
|
|
def test_self_declared_invoice(mock_get, app, remote_regie):
|
|
mock_json = mock.Mock()
|
|
mock_json.json.return_value = {'err': 0, 'data': INVOICES[0]}
|
|
mock_get.return_value = mock_json
|
|
|
|
page = Page(title='xxx', slug='test-self-invoice', template_name='standard')
|
|
page.save()
|
|
cell = SelfDeclaredInvoicePayment(regie='remote', page=page, placeholder='content', order=0)
|
|
cell.save()
|
|
|
|
resp = app.get('/test-self-invoice/')
|
|
resp = resp.form.submit().follow()
|
|
assert 'Sorry, no invoice were found with that number and amount.'
|
|
|
|
resp = app.get('/test-self-invoice/')
|
|
resp.form['invoice-number'] = 'F201601'
|
|
resp.form['invoice-amount'] = 'FOOBAR' # wrong format
|
|
resp = resp.form.submit().follow()
|
|
assert 'Sorry, the provided amount is invalid.'
|
|
|
|
resp = app.get('/test-self-invoice/')
|
|
resp.form['invoice-number'] = 'F201602' # invalid number
|
|
resp.form['invoice-amount'] = '123.45'
|
|
resp = resp.form.submit().follow()
|
|
assert 'Sorry, no invoice were found with that number and amount.'
|
|
|
|
resp = app.get('/test-self-invoice/')
|
|
resp.form['invoice-number'] = 'F201601'
|
|
resp.form['invoice-amount'] = '123.46' # invalid amount
|
|
resp = resp.form.submit().follow()
|
|
assert 'Sorry, no invoice were found with that number and amount.'
|
|
|
|
resp = app.get('/test-self-invoice/')
|
|
resp.form['invoice-number'] = 'F201601'
|
|
resp.form['invoice-amount'] = '123.45'
|
|
resp = resp.form.submit()
|
|
path = urlparse.urlparse(resp.location).path
|
|
assert path.startswith('/lingo/item/%s/' % remote_regie.id)
|
|
resp = resp.follow()
|
|
|
|
|
|
@mock.patch('combo.apps.lingo.models.Regie.pay_invoice')
|
|
@mock.patch('combo.apps.lingo.models.requests.get')
|
|
@mock.patch('combo.apps.lingo.models.requests.post')
|
|
def test_remote_item_payment_failure(mock_post, mock_get, mock_pay_invoice, app, remote_regie):
|
|
assert remote_regie.is_remote()
|
|
encrypt_id = aes_hex_encrypt(settings.SECRET_KEY, 'F201601')
|
|
mock_json = mock.Mock()
|
|
mock_json.json.return_value = {'err': 0, 'data': INVOICES[0]}
|
|
mock_get.return_value = mock_json
|
|
mock_pay_invoice.return_value = mock.Mock(status_code=200)
|
|
resp = app.get('/lingo/item/%s/%s/' % (remote_regie.id, encrypt_id))
|
|
form = resp.form
|
|
|
|
assert 'email' in form.fields
|
|
assert form['email'].value == ''
|
|
assert 'item_url' in form.fields
|
|
assert form['item_url'].value == '/lingo/item/%s/%s/' % (remote_regie.id, encrypt_id)
|
|
assert 'item' in form.fields
|
|
assert form['item'].value == 'F201601'
|
|
assert 'regie' in form.fields
|
|
assert form['regie'].value == unicode(remote_regie.pk)
|
|
|
|
form['email'] = 'test@example.net'
|
|
resp = form.submit()
|
|
|
|
assert resp.status_code == 302
|
|
location = resp.location
|
|
assert 'dummy-payment' in location
|
|
parsed = urlparse.urlparse(location)
|
|
# get return_url and transaction id from location
|
|
qs = urlparse.parse_qs(parsed.query)
|
|
args = {'transaction_id': qs['transaction_id'][0], 'signed': True,
|
|
'ok': True, 'reason': 'Paid'}
|
|
# make sure return url is the user return URL
|
|
assert urlparse.urlparse(qs['return_url'][0]).path.startswith(
|
|
reverse('lingo-return', kwargs={'regie_pk': remote_regie.id}))
|
|
# simulate successful return URL
|
|
resp = app.get(qs['return_url'][0], params=args)
|
|
assert resp.status_code == 302
|
|
assert urlparse.urlparse(resp.url).path == '/'
|
|
# simulate successful call to callback URL
|
|
mock_get.side_effect = ConnectionError('where is my hostname?')
|
|
resp = app.get(reverse('lingo-callback', kwargs={'regie_pk': remote_regie.id}), params=args)
|
|
trans = Transaction.objects.all()
|
|
b_item = BasketItem.objects.all()
|
|
|
|
assert trans.count() == 1
|
|
assert not b_item
|
|
assert trans[0].to_be_paid_remote_items
|
|
assert resp.status_code == 200
|
|
|
|
mock_get.side_effect = None
|
|
call_command('update_transactions')
|
|
|
|
assert Transaction.objects.count() == 1
|
|
assert BasketItem.objects.count() == 1
|
|
assert Transaction.objects.all()[0].to_be_paid_remote_items is None
|
|
|
|
call_command('update_transactions')
|