lingo: use shared service keys for signature checks (#10935)
This commit is contained in:
parent
959b436919
commit
5b86500c5e
|
@ -62,6 +62,23 @@ def get_basket_url():
|
|||
return LingoBasketCell.objects.all()[0].page.get_online_url()
|
||||
|
||||
|
||||
def check_request_signature(request):
|
||||
keys = [getattr(settings, 'LINGO_API_SIGN_KEY', '12345')]
|
||||
orig = request.GET.get('orig', '')
|
||||
known_services = getattr(settings, 'KNOWN_SERVICES', [])
|
||||
if known_services and orig:
|
||||
key = None
|
||||
for l in known_services.itervalues():
|
||||
for service in l.itervalues():
|
||||
if 'verif_orig' in service and service['verif_orig'] == orig:
|
||||
key = service['secret']
|
||||
break
|
||||
if key:
|
||||
keys.append(key)
|
||||
break
|
||||
return check_query(request.META['QUERY_STRING'], keys)
|
||||
|
||||
|
||||
class RegiesApiView(ListView):
|
||||
model = Regie
|
||||
|
||||
|
@ -90,8 +107,7 @@ class AddBasketItemApiView(View):
|
|||
return d.quantize(Decimal('0.01'), ROUND_HALF_UP)
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
key = getattr(settings, 'LINGO_API_SIGN_KEY', '12345')
|
||||
if not check_query(request.META['QUERY_STRING'], key):
|
||||
if not check_request_signature(request):
|
||||
return HttpResponseForbidden()
|
||||
|
||||
request_body = json.loads(self.request.body)
|
||||
|
@ -157,8 +173,7 @@ class RemoveBasketItemApiView(View):
|
|||
return super(RemoveBasketItemApiView, self).dispatch(*args, **kwargs)
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
key = getattr(settings, 'LINGO_API_SIGN_KEY', '12345')
|
||||
if not check_query(request.META['QUERY_STRING'], key):
|
||||
if not check_request_signature(request):
|
||||
return HttpResponseForbidden()
|
||||
|
||||
request_body = json.loads(self.request.body)
|
||||
|
@ -198,8 +213,7 @@ class ValidateTransactionApiView(View):
|
|||
return super(ValidateTransactionApiView, self).dispatch(*args, **kwargs)
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
key = getattr(settings, 'LINGO_API_SIGN_KEY', '12345')
|
||||
if not check_query(request.META['QUERY_STRING'], key):
|
||||
if not check_request_signature(request):
|
||||
return HttpResponseForbidden()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -240,8 +254,7 @@ class CancelTransactionApiView(View):
|
|||
return super(CancelTransactionApiView, self).dispatch(*args, **kwargs)
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
key = getattr(settings, 'LINGO_API_SIGN_KEY', '12345')
|
||||
if not check_query(request.META['QUERY_STRING'], key):
|
||||
if not check_request_signature(request):
|
||||
return HttpResponseForbidden()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
|
@ -48,6 +48,22 @@ def user():
|
|||
user = User.objects.create_user('admin', email=None, password='admin')
|
||||
return user
|
||||
|
||||
@pytest.fixture(params=['orig', 'sign_key'])
|
||||
def key(request, settings):
|
||||
if request.param == 'orig':
|
||||
key = 'abcde'
|
||||
settings.KNOWN_SERVICES = {
|
||||
'wcs': {
|
||||
'wcs1': {
|
||||
'verif_orig': 'wcs',
|
||||
'secret': key,
|
||||
},
|
||||
}
|
||||
}
|
||||
return key
|
||||
else:
|
||||
return settings.LINGO_API_SIGN_KEY
|
||||
|
||||
def login(username='admin', password='admin'):
|
||||
resp = client.post('/login/', {'username': username, 'password': password})
|
||||
assert resp.status_code == 302
|
||||
|
@ -113,7 +129,7 @@ def test_successfull_items_payment(regie, user):
|
|||
resp = client.get(reverse('lingo-callback', kwargs={'regie_pk': regie.id}), args)
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_add_amount_to_basket(regie, user):
|
||||
def test_add_amount_to_basket(key, regie, user):
|
||||
other_regie = Regie(label='test2', slug='test2')
|
||||
other_regie.save()
|
||||
|
||||
|
@ -122,8 +138,8 @@ def test_add_amount_to_basket(regie, user):
|
|||
amount = 42
|
||||
data = {'amount': amount, 'display_name': 'test amount',
|
||||
'url': 'http://example.com'}
|
||||
url = '%s?email=%s' % (reverse('api-add-basket-item'), user_email)
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
|
||||
url = sign_url(url, key)
|
||||
resp = client.post(url, json.dumps(data), content_type='application/json')
|
||||
assert resp.status_code == 200
|
||||
assert json.loads(resp.content)['result'] == 'success'
|
||||
|
@ -131,7 +147,7 @@ def test_add_amount_to_basket(regie, user):
|
|||
assert BasketItem.objects.filter(amount=amount)[0].regie_id == regie.id
|
||||
|
||||
data['extra'] = {'amount': '22.22'}
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = sign_url(url, key)
|
||||
resp = client.post(url, json.dumps(data), content_type='application/json')
|
||||
assert resp.status_code == 200
|
||||
assert json.loads(resp.content)['result'] == 'success'
|
||||
|
@ -166,11 +182,11 @@ def test_add_amount_to_basket(regie, user):
|
|||
assert BasketItem.objects.filter(amount=Decimal('22.24')).exists()
|
||||
assert BasketItem.objects.filter(amount=Decimal('22.24'))[0].regie_id == regie.id
|
||||
|
||||
def test_cancel_basket_item(regie, user):
|
||||
def test_cancel_basket_item(key, regie, user):
|
||||
user_email = 'foo@example.com'
|
||||
User.objects.get_or_create(email=user_email)
|
||||
url = '%s?email=%s' % (reverse('api-add-basket-item'), user_email)
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = '%s?email=%s&orig=wcs' % (reverse('api-add-basket-item'), user_email)
|
||||
url = sign_url(url, key)
|
||||
data = {'amount': 42, 'display_name': 'test amount', 'url': 'http://example.com'}
|
||||
resp = client.post(url, json.dumps(data), content_type='application/json')
|
||||
assert resp.status_code == 200
|
||||
|
@ -185,8 +201,8 @@ def test_cancel_basket_item(regie, user):
|
|||
assert BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
|
||||
assert BasketItem.objects.filter(amount=21, cancellation_date__isnull=True).exists()
|
||||
|
||||
url = '%s?email=%s' % (reverse('api-remove-basket-item'), user_email)
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = '%s?email=%s&orig=wcs' % (reverse('api-remove-basket-item'), user_email)
|
||||
url = sign_url(url, key)
|
||||
data = {'basket_item_id': basket_item_id, 'skip_notification': True}
|
||||
resp = client.post(url, json.dumps(data), content_type='application/json')
|
||||
assert not BasketItem.objects.filter(amount=42, cancellation_date__isnull=True).exists()
|
||||
|
@ -258,7 +274,7 @@ def test_transaction_expiration():
|
|||
assert Transaction.objects.get(id=t1.id).status == EXPIRED
|
||||
assert Transaction.objects.get(id=t2.id).status == 0
|
||||
|
||||
def test_transaction_validate(regie, user):
|
||||
def test_transaction_validate(key, regie, user):
|
||||
t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12,
|
||||
status=eopayment.PAID)
|
||||
t1.save()
|
||||
|
@ -267,13 +283,13 @@ def test_transaction_validate(regie, user):
|
|||
resp = client.post(url, content_type='application/json')
|
||||
assert resp.status_code == 403
|
||||
|
||||
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0'
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=0&orig=wcs'
|
||||
url = sign_url(url, key)
|
||||
resp = client.post(url, content_type='application/json')
|
||||
assert resp.status_code == 404
|
||||
|
||||
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s' % t1.id
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
|
||||
url = sign_url(url, key)
|
||||
resp = client.post(url, content_type='application/json')
|
||||
assert json.loads(resp.content)['err'] == 0
|
||||
operations = TransactionOperation.objects.filter(transaction=t1)
|
||||
|
@ -282,28 +298,28 @@ def test_transaction_validate(regie, user):
|
|||
|
||||
with mock.patch.object(eopayment.dummy.Payment, 'validate', autospec=True) as mock_validate:
|
||||
mock_validate.side_effect = eopayment.ResponseError
|
||||
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s' % t1.id
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = reverse('api-validate-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
|
||||
url = sign_url(url, key)
|
||||
resp = client.post(url, content_type='application/json')
|
||||
assert json.loads(resp.content)['err'] == 1
|
||||
assert TransactionOperation.objects.filter(transaction=t1).count() == 1
|
||||
|
||||
def test_transaction_cancel(regie, user):
|
||||
def test_transaction_cancel(key, regie, user):
|
||||
t1 = Transaction(regie=regie, bank_data={'bank': 'data'}, amount=12,
|
||||
status=eopayment.PAID)
|
||||
t1.save()
|
||||
|
||||
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0'
|
||||
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs'
|
||||
resp = client.post(url, content_type='application/json')
|
||||
assert resp.status_code == 403
|
||||
|
||||
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0'
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=0&orig=wcs'
|
||||
url = sign_url(url, key)
|
||||
resp = client.post(url, content_type='application/json')
|
||||
assert resp.status_code == 404
|
||||
|
||||
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s' % t1.id
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
|
||||
url = sign_url(url, key)
|
||||
resp = client.post(url, content_type='application/json')
|
||||
assert json.loads(resp.content)['err'] == 0
|
||||
operations = TransactionOperation.objects.filter(transaction=t1)
|
||||
|
@ -312,8 +328,8 @@ def test_transaction_cancel(regie, user):
|
|||
|
||||
with mock.patch.object(eopayment.dummy.Payment, 'cancel', autospec=True) as mock_cancel:
|
||||
mock_cancel.side_effect = eopayment.ResponseError
|
||||
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s' % t1.id
|
||||
url = sign_url(url, settings.LINGO_API_SIGN_KEY)
|
||||
url = reverse('api-cancel-transaction') + '?amount=10&transaction_id=%s&orig=wcs' % t1.id
|
||||
url = sign_url(url, key)
|
||||
resp = client.post(url, content_type='application/json')
|
||||
assert json.loads(resp.content)['err'] == 1
|
||||
assert TransactionOperation.objects.filter(transaction=t1).count() == 1
|
||||
|
|
Loading…
Reference in New Issue