fargo/tests/test_oauth2.py

274 lines
10 KiB
Python

# -*- coding: utf-8 -*-
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import json
import mock
import pytest
from django.core.urlresolvers import reverse
from django.core.management import call_command
from django.utils.http import quote, urlencode
from django.utils.six.moves.urllib import parse as urlparse
from fargo.oauth2.models import OAuth2Client, OAuth2Authorize, OAuth2TempFile
from fargo.fargo.models import UserDocument
from test_manager import login
pytestmark = pytest.mark.django_db
class FakedResponse(mock.Mock):
def json(self):
return json.loads(self.content)
@pytest.fixture
def oauth2_client():
return OAuth2Client.objects.create(
client_name='test_oauth2', client_id='client-id', client_secret='client-secret',
redirect_uris='https://example.net/document https://doc.example.net/ https://example.com')
def assert_error_redirect(url, error):
assert urlparse.urlparse(url).query == 'error=%s' % error
def test_get_document_oauth2(app, john_doe, oauth2_client, user_doc):
login(app, user=john_doe)
url = reverse('oauth2-authorize')
params = {
'client_secret': oauth2_client.client_secret,
'response_type': 'code',
'state': 'achipeachope'
}
# test missing redirect_uri
resp = app.get(url, params={}, status=400)
assert resp.text == 'missing redirect_uri parameter'
# test missing client id
params['redirect_uri'] = 'https://toto.example.com'
resp = app.get(url, params=params, status=302)
assert_error_redirect(resp.url, 'invalid_request')
# test invalid response type
params['client_id'] = oauth2_client.client_id
params['response_type'] = 'token'
resp = app.get(url, params=params, status=302)
assert_error_redirect(resp.url, 'unsupported_response_type')
# test invalid redirect uri
params['response_type'] = 'code'
resp = app.get(url, params=params, status=302)
assert_error_redirect(resp.url, 'invalid_redirect_uri')
params['redirect_uri'] = 'https://example.com'
resp = app.get(url, params=params)
assert resp.status_code == 200
assert len(resp.form['document'].options) == 2
options = resp.form['document'].options
assert u'éléphant.txt' in options[1]
# select the second document 'éléphant.txt'
resp.form['document'].select(options[1][0])
resp = resp.form.submit()
# check that the authorization has been registered for the user document
assert len(OAuth2Authorize.objects.filter(user_document__user=john_doe)) == 1
auth = OAuth2Authorize.objects.filter(user_document__user=john_doe)[0]
assert resp.status_code == 302
query = urlparse.urlparse(resp.location).query
assert [auth.code] == urlparse.parse_qs(query)['code']
assert ['achipeachope'] == urlparse.parse_qs(query)['state']
params.pop('response_type')
params.pop('state')
params['grant_type'] = 'authorization_code'
params['code'] = auth.code
url = reverse('oauth2-get-token')
app.authorization = ('Basic', (oauth2_client.client_id, oauth2_client.client_secret))
resp = app.post(url, params=params, status=200)
assert 'access_token' in resp.json
assert 'expires' in resp.json
assert resp.json['access_token'] == auth.access_token
url = reverse('oauth2-get-document')
app.authorization = ('Bearer', str(auth.access_token))
resp = app.get(url, status=200)
assert resp.content_type == 'application/octet-stream'
assert 'Content-disposition' in resp.headers
content_disposition = resp.content_disposition.replace(' ', '').split(';')
assert content_disposition[0] == 'attachment'
assert content_disposition[1] == 'filename="?l?phant.txt"'
assert content_disposition[2] == 'filename*=UTF-8\'\'%C3%A9l%C3%A9phant.txt'
def test_put_document(app, john_doe, oauth2_client):
login(app, user=john_doe)
with open('tests/test_oauth2.txt', 'rb') as f:
data = f.read()
url = reverse('oauth2-put-document')
resp = app.post(url, params=data, status=401)
app.authorization = ('Basic', (str(oauth2_client.client_id), str(oauth2_client.client_secret)))
resp = app.post(url, params=data, status=400)
assert 'missing content-disposition header' in resp.text
filename = 'éléphant.txt'
percent_encode_filename = quote(filename, safe='')
headers = {
'Content-disposition': 'attachment; filename="%s"; filename*=UTF-8\'\'%s' % (filename, percent_encode_filename)
}
assert len(OAuth2TempFile.objects.all()) == 0
resp = app.post(url, params=data, headers=headers, status=200)
# test that we can still push the same document
resp = app.post(url, params=data, headers=headers, status=200)
assert len(OAuth2TempFile.objects.all()) == 2
doc = OAuth2TempFile.objects.latest('creation_date')
location = reverse('oauth2-put-document-authorize', kwargs={'pk': doc.pk})
assert location in resp.location
app.authorization = None
url = location + '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url, status=200)
assert OAuth2TempFile.objects.count() == 2
assert UserDocument.objects.count() == 0
resp = resp.form.submit()
assert resp.status_code == 302
assert resp.location == 'https://example.com'
assert OAuth2TempFile.objects.count() == 1
assert UserDocument.objects.count() == 1
assert OAuth2TempFile.objects.get().document == UserDocument.objects.get().document
assert UserDocument.objects.filter(user=john_doe, document=doc.document, filename=u'éléphant.txt').exists()
def test_confirm_put_document_file_exception(app, oauth2_client, john_doe, user_doc):
login(app, user=john_doe)
oauth_tmp_file = OAuth2TempFile.objects.create(
client=oauth2_client,
document=user_doc.document,
filename=user_doc.filename)
url = reverse('oauth2-put-document-authorize', kwargs={'pk': 'fakemofo'})
url += '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url)
assert 'The document has not been uploaded' in resp.text
url = reverse('oauth2-put-document-authorize', kwargs={'pk': oauth_tmp_file.pk})
url += '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url)
assert 'This document is already in your portfolio' in resp.text
@mock.patch('fargo.oauth2.authentication.requests.post')
def test_idp_authentication(mocked_post, settings, app, oauth2_client, john_doe, user_doc):
login(app, user=john_doe)
url = reverse('oauth2-authorize')
params = {
'client_id': oauth2_client.client_id,
'client_secret': 'fake',
'response_type': 'code',
'state': 'achipeachope',
'redirect': 'https://example.com/'
}
params['redirect_uri'] = 'https://example.com'
resp = app.get(url, params=params)
options = resp.form['document'].options
assert u'éléphant.txt' in options[1]
resp.form['document'].select(options[1][0])
resp = resp.form.submit()
auth = OAuth2Authorize.objects.filter(user_document__user=john_doe)[0]
params.pop('response_type')
params.pop('state')
params['grant_type'] = 'authorization_code'
params['code'] = auth.code
url = reverse('oauth2-get-token')
# when remote remote idp not set
app.authorization = ('Basic', ('client-id', 'fake'))
resp = app.post(url, params=params, status=401)
resp.json['detail'] == 'Invalid client_id/client_secret.'
# when remote idp fails to authenticate rp
settings.FARGO_IDP_URL = 'https://idp.example.org'
response = {
"result": 0, "errors": ["Invalid username/password."]
}
mocked_post.return_value = FakedResponse(content=json.dumps(response))
resp = app.post(url, params=params, status=401)
resp.json['detail'] == 'Invalid client_id/client_secret.'
# when remote idp authenticates rp
response = {"result": 1, "errors": []}
mocked_post.return_value = FakedResponse(content=json.dumps(response))
resp = app.post(url, params=params, status=200)
assert resp.json['access_token'] == auth.access_token
url = reverse('oauth2-get-document')
app.authorization = ('Bearer', str(auth.access_token))
resp = app.get(url, status=200)
def test_command_create_client(db):
call_command('oauth2-create-client', 'test', 'https://example.com/')
client = OAuth2Client.objects.get()
assert client.client_name == 'test'
assert client.redirect_uris == 'https://example.com/'
assert client.client_id
assert client.client_secret
OAuth2Client.objects.all().delete()
call_command('oauth2-create-client',
'test',
'https://example.com/',
'--client-id=wtf',
'--client-secret=whocares')
client = OAuth2Client.objects.get()
assert client.client_name == 'test'
assert client.redirect_uris == 'https://example.com/'
assert client.client_id == 'wtf'
assert client.client_secret == 'whocares'
def test_command_put_document(db, capsys, app, john_doe):
call_command('oauth2-create-client', 'test', 'https://example.com/')
client = OAuth2Client.objects.get()
path = os.path.join(os.path.dirname(__file__), 'pdf-sample.pdf')
redirect_uri = 'https://example.com/'
call_command('oauth2-put-document', '--client-id=%s' % client.pk, redirect_uri, path)
out, err = capsys.readouterr()
assert err == ''
url = out.strip()
response = app.get(url).follow()
response.form.set('username', john_doe.username)
response.form.set('password', john_doe.username)
response = response.form.submit().follow()
assert 'pdf-sample.pdf' in response
temp_file = OAuth2TempFile.objects.get()
assert temp_file.uuid in response
response = response.form.submit('accept')
assert response['Location'] == redirect_uri
assert UserDocument.objects.filter(user=john_doe, document=temp_file.document).exists()
assert OAuth2TempFile.objects.count() == 0