lingo: get name id from user object instead of session (#13124)
This commit is contained in:
parent
1bb00cd1c5
commit
005d003292
|
@ -19,7 +19,6 @@
|
|||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import requests
|
||||
import urlparse
|
||||
|
||||
from dateutil import parser
|
||||
|
@ -41,7 +40,7 @@ from ckeditor.fields import RichTextField
|
|||
|
||||
from combo.data.models import CellBase
|
||||
from combo.data.library import register_cell_class
|
||||
from combo.utils import NothingInCacheException, sign_url, aes_hex_encrypt
|
||||
from combo.utils import NothingInCacheException, sign_url, aes_hex_encrypt, requests
|
||||
|
||||
EXPIRED = 9999
|
||||
|
||||
|
@ -121,68 +120,41 @@ class Regie(models.Model):
|
|||
return self.text_on_success
|
||||
return _('Your payment has been succesfully registered.')
|
||||
|
||||
def get_past_items(self, context):
|
||||
"""
|
||||
returns past items
|
||||
"""
|
||||
return self.get_items(context, past=True)
|
||||
|
||||
|
||||
def get_items(self, context, past=False):
|
||||
"""
|
||||
returns current or past items
|
||||
"""
|
||||
def get_invoices(self, user, history=False):
|
||||
if not self.is_remote():
|
||||
payed = not past
|
||||
return self.basketitem_set.filter(payment_date__isnull=payed,
|
||||
user=context.get('user'))
|
||||
if context.get('user'):
|
||||
if context.get('request') and hasattr(context['request'], 'session') and \
|
||||
context['request'].session.get('mellon_session'):
|
||||
mellon = context.get('request').session['mellon_session']
|
||||
url = self.webservice_url + '/invoices/'
|
||||
if past:
|
||||
url += 'history/'
|
||||
items = requests.get(self.signed_url(context['request'], url,
|
||||
NameID=mellon['name_id_content'])).json()
|
||||
if items.get('data'):
|
||||
return [build_remote_item(item, self) for item in items.get('data')]
|
||||
return []
|
||||
return self.basketitem_set.filter(payment_date__isnull=boot(not history), user=user)
|
||||
if user:
|
||||
url = self.webservice_url + '/invoices/'
|
||||
if history:
|
||||
url += 'history/'
|
||||
items = requests.get(url, user=user, remote_service='auto').json()
|
||||
if items.get('data'):
|
||||
return [build_remote_item(item, self) for item in items.get('data')]
|
||||
return []
|
||||
return []
|
||||
|
||||
def download_item(self, request, item_id):
|
||||
def get_invoice(self, user, invoice_id):
|
||||
if not self.is_remote():
|
||||
return self.basketitem_set.get(pk=invoice_id)
|
||||
url = self.webservice_url + '/invoice/%s/' % invoice_id
|
||||
item = requests.get(url, user=user, remote_service='auto').json()
|
||||
return build_remote_item(item.get('data'), self)
|
||||
|
||||
def get_invoice_pdf(self, user, invoice_id):
|
||||
"""
|
||||
downloads item's file
|
||||
"""
|
||||
if self.is_remote():
|
||||
if hasattr(request, 'session') and request.session.get('mellon_session'):
|
||||
mellon = request.session.get('mellon_session')
|
||||
url = self.webservice_url + '/invoice/%s/pdf/' % item_id
|
||||
return requests.get(self.signed_url(request, url,
|
||||
NameID=mellon['name_id_content']))
|
||||
raise PermissionDenied
|
||||
if self.is_remote() and user:
|
||||
url = self.webservice_url + '/invoice/%s/pdf/' % invoice_id
|
||||
return requests.get(url, user=user, remote_service='auto')
|
||||
raise PermissionDenied
|
||||
|
||||
def get_item(self, request, item):
|
||||
if not self.is_remote():
|
||||
return self.basketitem_set.get(pk=item)
|
||||
|
||||
if hasattr(request, 'session') and request.session.get('mellon_session'):
|
||||
mellon = request.session.get('mellon_session')
|
||||
url = self.webservice_url + '/invoice/%s/' % item
|
||||
item = requests.get(self.signed_url(request, url,
|
||||
NameID=mellon['name_id_content'])).json()
|
||||
return build_remote_item(item.get('data'), self)
|
||||
else:
|
||||
url = self.webservice_url + '/invoice/%s/' % item
|
||||
item = requests.get(self.signed_url(request, url)).json()
|
||||
return build_remote_item(item.get('data'), self)
|
||||
|
||||
def pay_item(self, request, item_id, transaction_id, transaction_date):
|
||||
url = self.webservice_url + '/invoice/%s/pay/' % item_id
|
||||
def pay_invoice(self, invoice_id, transaction_id, transaction_date):
|
||||
url = self.webservice_url + '/invoice/%s/pay/' % invoice_id
|
||||
data = {'transaction_id': transaction_id,
|
||||
'transaction_date': transaction_date.strftime('%Y-%m-%dT%H:%M:%S')}
|
||||
headers = {'content-type': 'application/json'}
|
||||
return requests.post(self.signed_url(request, url),
|
||||
return requests.post(url, remote_service='auto',
|
||||
data=json.dumps(data), headers=headers).json()
|
||||
|
||||
def as_api_dict(self):
|
||||
|
@ -190,12 +162,6 @@ class Regie(models.Model):
|
|||
'label': self.label,
|
||||
'description': self.description}
|
||||
|
||||
def signed_url(self, request, url, **params):
|
||||
orig = request.get_host().split(':')[0]
|
||||
url += '?orig=' + orig +'&' + urlencode(params)
|
||||
signature_key = settings.LINGO_SIGNATURE_KEY
|
||||
return sign_url(url, key=signature_key)
|
||||
|
||||
|
||||
class BasketItem(models.Model):
|
||||
user = models.ForeignKey(settings.AUTH_USER_MODEL)
|
||||
|
@ -484,8 +450,7 @@ class Items(CellBase):
|
|||
|
||||
def get_cell_extra_context(self, context):
|
||||
ctx = {'title': self.title, 'text': self.text}
|
||||
items = self.get_items()
|
||||
# sort items by creation date
|
||||
items = self.get_invoices(user=context['user'])
|
||||
items.sort(key=lambda i: i.creation_date, reverse=True)
|
||||
ctx.update({'items': items})
|
||||
return ctx
|
||||
|
@ -503,10 +468,10 @@ class ItemsHistory(Items):
|
|||
class Meta:
|
||||
verbose_name = _('Items History Cell')
|
||||
|
||||
def get_items(self):
|
||||
def get_invoices(self, user):
|
||||
items = []
|
||||
for r in self.get_regies():
|
||||
items.extend(r.get_past_items(self.context))
|
||||
items.extend(r.get_invoices(user, history=True))
|
||||
return items
|
||||
|
||||
|
||||
|
@ -516,8 +481,8 @@ class ActiveItems(Items):
|
|||
class Meta:
|
||||
verbose_name = _('Active Items Cell')
|
||||
|
||||
def get_items(self):
|
||||
def get_invoices(self, user):
|
||||
items = []
|
||||
for r in self.get_regies():
|
||||
items.extend(r.get_items(self.context))
|
||||
items.extend(r.get_invoices(user))
|
||||
return items
|
||||
|
|
|
@ -301,8 +301,8 @@ class PayView(View):
|
|||
items = []
|
||||
remote_items_data = []
|
||||
# get all items data from regie webservice
|
||||
for item in request.POST.getlist('item'):
|
||||
remote_items_data.append(regie.get_item(request, item))
|
||||
for item_id in request.POST.getlist('item'):
|
||||
remote_items_data.append(regie.get_invoice(request.user, item_id))
|
||||
remote_items = ','.join([x.id for x in remote_items_data])
|
||||
else:
|
||||
items = BasketItem.objects.filter(id__in=request.POST.getlist('item'), regie=regie)
|
||||
|
@ -434,9 +434,8 @@ class CallbackView(View):
|
|||
# ignore errors, it should be retried later on if it fails
|
||||
pass
|
||||
if transaction.remote_items:
|
||||
for item in transaction.remote_items.split(','):
|
||||
regie.pay_item(request, item, transaction.order_id,
|
||||
transaction.end_date)
|
||||
for item_id in transaction.remote_items.split(','):
|
||||
regie.pay_invoice(item_id, transaction.order_id, transaction.end_date)
|
||||
|
||||
return HttpResponse()
|
||||
|
||||
|
@ -510,7 +509,7 @@ class ItemDownloadView(View):
|
|||
raise Http404()
|
||||
|
||||
try:
|
||||
data = regie.download_item(request, item_id)
|
||||
data = regie.get_invoice_pdf(request.user, item_id)
|
||||
except PermissionDenied:
|
||||
return HttpResponseForbidden()
|
||||
except DecryptionError as e:
|
||||
|
@ -533,7 +532,7 @@ class ItemView(TemplateView):
|
|||
|
||||
def get_context_data(self, **kwargs):
|
||||
ret = {'item_url': self.request.get_full_path()}
|
||||
|
||||
|
||||
try:
|
||||
regie = Regie.objects.get(pk=kwargs['regie_id'])
|
||||
except Regie.DoesNotExist:
|
||||
|
@ -543,8 +542,8 @@ class ItemView(TemplateView):
|
|||
item_id = aes_hex_decrypt(settings.SECRET_KEY, kwargs['item_crypto_id'])
|
||||
except DecryptionError:
|
||||
raise Http404()
|
||||
|
||||
item = regie.get_item(self.request, item_id)
|
||||
|
||||
item = regie.get_invoice(self.request.user, item_id)
|
||||
if not item:
|
||||
raise Http404(_('No item was found.'))
|
||||
ret.update({'item': item, 'regie': regie})
|
||||
|
|
|
@ -47,21 +47,28 @@ def remote_regie():
|
|||
regie.payment_min_amount = Decimal(2.0)
|
||||
regie.service = 'dummy'
|
||||
regie.service_options = {'siret': '1234'}
|
||||
regie.webservice_url = 'http://passerelle.example.net/regie' # is_remote
|
||||
regie.webservice_url = 'http://example.org/regie' # is_remote
|
||||
regie.save()
|
||||
return regie
|
||||
|
||||
@pytest.fixture
|
||||
def user():
|
||||
try:
|
||||
user = User.objects.get(username='admin')
|
||||
except User.DoesNotExist:
|
||||
user = User.objects.create_user('admin', email=None, password='admin')
|
||||
return user
|
||||
|
||||
class MockUser(object):
|
||||
email = 'foo@example.net'
|
||||
def is_authenticated(self):
|
||||
return True
|
||||
|
||||
@mock.patch('combo.apps.lingo.models.requests.get')
|
||||
def test_remote_regie_cell(mock_get, remote_regie, user):
|
||||
def __init__(self):
|
||||
class MockSAMLUsers(object):
|
||||
def exists(self):
|
||||
return True
|
||||
def first(self):
|
||||
class MockSAMLUser(object):
|
||||
name_id = 'r2d2'
|
||||
return MockSAMLUser()
|
||||
self.saml_identifiers = MockSAMLUsers()
|
||||
|
||||
@mock.patch('combo.utils.RequestsSession.request')
|
||||
def test_remote_regie_cell(mock_request, remote_regie):
|
||||
assert remote_regie.is_remote() == True
|
||||
|
||||
page = Page(title='xxx', slug='test_basket_cell', template_name='standard')
|
||||
|
@ -69,47 +76,47 @@ def test_remote_regie_cell(mock_get, remote_regie, user):
|
|||
cell = ActiveItems(regie='remote', page=page, placeholder='content', order=0)
|
||||
context = Context({'request': RequestFactory().get('/')})
|
||||
context['synchronous'] = True # to get fresh content
|
||||
context['request'].user = user
|
||||
|
||||
# fake mellon (SAML) context
|
||||
user = MockUser()
|
||||
context['user'] = user
|
||||
context['request'].session = {'mellon_session': {'name_id_content': '4242'}}
|
||||
context['request'].user = user
|
||||
|
||||
assert cell.is_relevant(context) is True
|
||||
|
||||
# show regie with an invoice
|
||||
mock_json = mock.Mock()
|
||||
mock_json.json.return_value = {'err': 0, 'data': INVOICES}
|
||||
mock_get.return_value = mock_json
|
||||
mock_request.return_value = mock_json
|
||||
content = cell.render(context)
|
||||
assert 'F-2016-One' in content
|
||||
assert '123.45' in content
|
||||
|
||||
# check if regie webservice has been correctly called
|
||||
url = mock_get.call_args[0][0]
|
||||
assert mock_request.call_args[0][0] == 'GET'
|
||||
url = mock_request.call_args[0][1]
|
||||
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
|
||||
assert scheme == 'http'
|
||||
assert netloc == 'passerelle.example.net'
|
||||
assert netloc == 'example.org'
|
||||
assert path == '/regie/invoices/'
|
||||
query = urlparse.parse_qs(querystring, keep_blank_values=True)
|
||||
assert query['NameID'][0] == '4242'
|
||||
assert query['orig'][0] == 'testserver'
|
||||
assert check_query(querystring, '54321') == True
|
||||
assert query['NameID'][0] == 'r2d2'
|
||||
assert query['orig'][0] == 'combo'
|
||||
assert check_query(querystring, 'combo') == True
|
||||
|
||||
# with no invoice
|
||||
mock_json.json.return_value = {'err': 0, 'data': []}
|
||||
content = cell.render(context)
|
||||
assert 'No items yet' in content
|
||||
|
||||
@mock.patch('combo.apps.lingo.models.Regie.pay_item')
|
||||
@mock.patch('combo.apps.lingo.models.Regie.pay_invoice')
|
||||
@mock.patch('combo.apps.lingo.models.requests.get')
|
||||
def test_anonymous_successful_item_payment(mock_get, mock_pay_item, app, remote_regie):
|
||||
def test_anonymous_successful_item_payment(mock_get, mock_pay_invoice, app, remote_regie):
|
||||
assert remote_regie.is_remote() == True
|
||||
encrypt_id = aes_hex_encrypt(settings.SECRET_KEY, 'F201601')
|
||||
mock_json = mock.Mock()
|
||||
mock_json.json.return_value = {'err': 0, 'data': INVOICES[0]}
|
||||
mock_get.return_value = mock_json
|
||||
mock_pay_item.return_value = mock.Mock(status_code=200)
|
||||
mock_pay_invoice.return_value = mock.Mock(status_code=200)
|
||||
resp = app.get('/lingo/item/%s/%s/' % (remote_regie.id, encrypt_id))
|
||||
form = resp.form
|
||||
|
||||
|
|
Loading…
Reference in New Issue