api: authenticate OAUTH2 clients through Authentic (fixes #16842)

This commit is contained in:
Josue Kouka 2018-01-31 17:11:16 +01:00 committed by Benjamin Dauvergne
parent 85ebba8394
commit e22648dd3f
3 changed files with 98 additions and 1 deletions

View File

@ -14,6 +14,13 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urlparse
import logging
import requests
from django.conf import settings
from django.utils.translation import ugettext_lazy as _
from rest_framework.authentication import BasicAuthentication
@ -51,12 +58,49 @@ class OAuth2User(object):
class FargoOAUTH2Authentication(BasicAuthentication):
def authenticate_through_idp(self, client_id, client_secret):
'''Check client_id and client_secret with configured IdP, and verify it is an OIDC
client.
'''
logger = logging.getLogger(__name__)
authentic_idp = getattr(settings, 'FARGO_IDP_URL', None)
if not authentic_idp:
logger.warning(u'idp check-password not configured')
return False, ''
url = urlparse.urljoin(authentic_idp, 'api/check-password/')
try:
response = requests.post(url, json={
'username': client_id,
'password': client_secret}, auth=(client_id, client_secret), verify=False)
response.raise_for_status()
except requests.RequestException as e:
logger.warning(u'idp check-password API failed: %s', e)
return False, 'idp is down'
try:
response = response.json()
except ValueError as e:
logger.warning(u'idp check-password API failed: %s, %r', e, response.content)
return False, 'idp is down'
if response.get('result') == 0:
logger.warning(u'idp check-password API failed')
return False, response.get('errors', [''])[0]
return True, None
def authenticate_credentials(self, client_id, client_secret):
try:
client = OAuth2Client.objects.get(
client_id=client_id, client_secret=client_secret)
except OAuth2Client.DoesNotExist:
raise AuthenticationFailed(_('Invalid client_id/client_secret.'))
success, error = self.authenticate_through_idp(client_id, client_secret)
if not success:
raise AuthenticationFailed(error or _('Invalid client_id/client_secret.'))
client = OAuth2Client.objects.get(client_id=client_id)
client.client_secret = client_secret
client.save()
user = OAuth2User(client)
user.authenticated = True

View File

@ -104,6 +104,7 @@ setup(
'django-filter>1',
'djangorestframework>=3.3,<3.4',
'file-magic',
'requests',
],
zip_safe=False,
cmdclass={

View File

@ -1,3 +1,5 @@
import json
import mock
import pytest
from urllib import quote
import urlparse
@ -14,6 +16,12 @@ from test_manager import login
pytestmark = pytest.mark.django_db
class FakedResponse(mock.Mock):
def json(self):
return json.loads(self.content)
@pytest.fixture
def oauth2_client():
return OAuth2Client.objects.create(
@ -160,3 +168,47 @@ def test_confirm_put_document_file_exception(app, john_doe, user_doc):
url += '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url)
assert 'This document is already in your portfolio' in resp.content
@mock.patch('fargo.oauth2.authentication.requests.post')
def test_idp_authentication(mocked_post, settings, app, oauth2_client, john_doe, user_doc):
login(app, user=john_doe)
url = reverse('oauth2-authorize')
params = {
'client_id': oauth2_client.client_id,
'client_secret': 'fake',
'response_type': 'code',
'state': 'achipeachope',
'redirect': 'https://example.com/'
}
params['redirect_uri'] = 'https://example.com'
resp = app.get(url, params=params)
resp.forms[0]['document'].select('1')
resp = resp.forms[0].submit()
auth = OAuth2Authorize.objects.filter(user_document__user=john_doe)[0]
params.pop('response_type')
params.pop('state')
params['grant_type'] = 'authorization_code'
params['code'] = auth.code
url = reverse('oauth2-get-token')
# when remote remote idp not set
app.authorization = ('Basic', ('client-id', 'fake'))
resp = app.post(url, params=params, status=401)
resp.json['detail'] == 'Invalid client_id/client_secret.'
# when remote idp fails to authenticate rp
settings.FARGO_IDP_URL = 'https://idp.example.org'
response = {
"result": 0, "errors": ["Invalid username/password."]
}
mocked_post.return_value = FakedResponse(content=json.dumps(response))
resp = app.post(url, params=params, status=401)
resp.json['detail'] == 'Invalid client_id/client_secret.'
# when remote idp authenticates rp
response = {"result": 1, "errors": []}
mocked_post.return_value = FakedResponse(content=json.dumps(response))
resp = app.post(url, params=params, status=200)
assert resp.json['access_token'] == auth.access_token
url = reverse('oauth2-get-document')
app.authorization = ('Bearer', str(auth.access_token))
resp = app.get(url, status=200)