fargo/tests/test_oauth2.py

208 lines
7.7 KiB
Python

import json
import mock
import pytest
from urllib import quote
import urlparse
from django.core.urlresolvers import reverse
from django.utils.http import urlencode
from fargo.oauth2.models import OAuth2Client, OAuth2Authorize, OAuth2TempFile
from fargo.fargo.models import UserDocument
from test_manager import login
pytestmark = pytest.mark.django_db
class FakedResponse(mock.Mock):
def json(self):
return json.loads(self.content)
@pytest.fixture
def oauth2_client():
return OAuth2Client.objects.create(
client_name='test_oauth2', client_id='client-id', client_secret='client-secret',
redirect_uris='https://example.net/document https://doc.example.net/ https://example.com')
def assert_error_redirect(url, error):
assert urlparse.urlparse(url).query == 'error=%s' % error
def test_get_document_oauth2(app, john_doe, oauth2_client, user_doc):
login(app, user=john_doe)
url = reverse('oauth2-authorize')
params = {
'client_secret': oauth2_client.client_secret,
'response_type': 'code',
'state': 'achipeachope'
}
# test missing redirect_uri
resp = app.get(url, params={}, status=400)
assert resp.content == 'missing redirect_uri parameter'
# test missing client id
params['redirect_uri'] = 'https://toto.example.com'
resp = app.get(url, params=params, status=302)
assert_error_redirect(resp.url, 'invalid_request')
# test invalid response type
params['client_id'] = oauth2_client.client_id
params['response_type'] = 'token'
resp = app.get(url, params=params, status=302)
assert_error_redirect(resp.url, 'unsupported_response_type')
# test invalid redirect uri
params['response_type'] = 'code'
resp = app.get(url, params=params, status=302)
assert_error_redirect(resp.url, 'invalid_redirect_uri')
params['redirect_uri'] = 'https://example.com'
resp = app.get(url, params=params)
assert resp.status_code == 200
assert len(resp.forms[0]['document'].options) == 2
assert 'Baudelaire.txt' in resp.forms[0]['document'].options[1]
resp.forms[0]['document'].select('1')
resp = resp.forms[0].submit()
assert len(OAuth2Authorize.objects.filter(user_document__user=john_doe)) == 1
auth = OAuth2Authorize.objects.filter(user_document__user=john_doe)[0]
assert resp.status_code == 302
assert 'code' in resp.location
assert auth.code in resp.location
assert 'state' in resp.location
assert 'achipeachope' in resp.location
params.pop('response_type')
params.pop('state')
params['grant_type'] = 'authorization_code'
params['code'] = auth.code
url = reverse('oauth2-get-token')
app.authorization = ('Basic', (oauth2_client.client_id, oauth2_client.client_secret))
resp = app.post(url, params=params, status=200)
assert 'access_token' in resp.json
assert 'expires' in resp.json
assert resp.json['access_token'] == auth.access_token
url = reverse('oauth2-get-document')
app.authorization = ('Bearer', str(auth.access_token))
resp = app.get(url, status=200)
assert resp.content_type == 'application/octet-stream'
assert 'Content-disposition' in resp.headers
content_disposition = resp.content_disposition.replace(' ', '').split(';')
assert content_disposition[0] == 'attachment'
assert content_disposition[1] == 'filename="Baudelaire.txt"'
assert content_disposition[2] == 'filename*=UTF-8\'\'Baudelaire.txt'
def test_put_document(app, john_doe, oauth2_client):
login(app, user=john_doe)
with open('tests/test_oauth2.txt', 'rb') as f:
data = f.read()
url = reverse('oauth2-put-document')
resp = app.post(url, params=data, status=401)
app.authorization = ('Basic', (str(oauth2_client.client_id), str(oauth2_client.client_secret)))
resp = app.post(url, params=data, status=400)
assert 'missing content-disposition header' in resp.content
filename = 'Baudelaire.txt'.encode('ascii', 'replace')
percent_encode_filename = quote(filename.encode('utf8'), safe='')
headers = {
'Content-disposition': 'attachment; filename="%s"; filename*=UTF-8\'\'%s' % (filename, percent_encode_filename)
}
assert len(OAuth2TempFile.objects.all()) == 0
resp = app.post(url, params=data, headers=headers, status=200)
# test that we can still push the same document
resp = app.post(url, params=data, headers=headers, status=200)
assert len(OAuth2TempFile.objects.all()) == 2
doc = OAuth2TempFile.objects.latest('creation_date')
location = reverse('oauth2-put-document-authorize', kwargs={'pk': doc.pk})
assert location in resp.location
app.authorization = None
url = location + '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url, status=200)
assert OAuth2TempFile.objects.count() == 2
assert UserDocument.objects.count() == 0
resp = resp.forms[0].submit()
assert resp.status_code == 302
assert resp.location == 'https://example.com'
assert OAuth2TempFile.objects.count() == 1
assert UserDocument.objects.count() == 1
assert OAuth2TempFile.objects.get().document == UserDocument.objects.get().document
assert UserDocument.objects.filter(user=john_doe, document=doc.document, filename='Baudelaire.txt').exists()
def test_confirm_put_document_file_exception(app, oauth2_client, john_doe, user_doc):
login(app, user=john_doe)
oauth_tmp_file = OAuth2TempFile.objects.create(
client=oauth2_client,
document=user_doc.document,
filename=user_doc.filename)
url = reverse('oauth2-put-document-authorize', kwargs={'pk': 'fakemofo'})
url += '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url)
assert 'The document has not been uploaded' in resp.content
url = reverse('oauth2-put-document-authorize', kwargs={'pk': oauth_tmp_file.pk})
url += '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url)
assert 'This document is already in your portfolio' in resp.content
@mock.patch('fargo.oauth2.authentication.requests.post')
def test_idp_authentication(mocked_post, settings, app, oauth2_client, john_doe, user_doc):
login(app, user=john_doe)
url = reverse('oauth2-authorize')
params = {
'client_id': oauth2_client.client_id,
'client_secret': 'fake',
'response_type': 'code',
'state': 'achipeachope',
'redirect': 'https://example.com/'
}
params['redirect_uri'] = 'https://example.com'
resp = app.get(url, params=params)
resp.forms[0]['document'].select('1')
resp = resp.forms[0].submit()
auth = OAuth2Authorize.objects.filter(user_document__user=john_doe)[0]
params.pop('response_type')
params.pop('state')
params['grant_type'] = 'authorization_code'
params['code'] = auth.code
url = reverse('oauth2-get-token')
# when remote remote idp not set
app.authorization = ('Basic', ('client-id', 'fake'))
resp = app.post(url, params=params, status=401)
resp.json['detail'] == 'Invalid client_id/client_secret.'
# when remote idp fails to authenticate rp
settings.FARGO_IDP_URL = 'https://idp.example.org'
response = {
"result": 0, "errors": ["Invalid username/password."]
}
mocked_post.return_value = FakedResponse(content=json.dumps(response))
resp = app.post(url, params=params, status=401)
resp.json['detail'] == 'Invalid client_id/client_secret.'
# when remote idp authenticates rp
response = {"result": 1, "errors": []}
mocked_post.return_value = FakedResponse(content=json.dumps(response))
resp = app.post(url, params=params, status=200)
assert resp.json['access_token'] == auth.access_token
url = reverse('oauth2-get-document')
app.authorization = ('Bearer', str(auth.access_token))
resp = app.get(url, status=200)