# fargo - document box # Copyright (C) 2016-2019 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import logging import requests from django.conf import settings from django.utils.six.moves.urllib import parse as urlparse from django.utils.translation import ugettext_lazy as _ from rest_framework.authentication import BasicAuthentication from rest_framework.exceptions import AuthenticationFailed from .models import OAuth2Client class OAuth2User(object): """ Fake user class to return in case OAuth2 Client authentication """ def __init__(self, oauth2_client): self.oauth2_client = oauth2_client self.authenticated = False def has_perm(self, *args, **kwargs): return True def has_perm_any(self, *args, **kwargs): return True def has_ou_perm(self, *args, **kwargs): return True def filter_by_perm(self, perms, queryset): return queryset def is_authenticated(self): return self.authenticated def is_staff(self): return False class FargoOAUTH2Authentication(BasicAuthentication): def authenticate_through_idp(self, client_id, client_secret): '''Check client_id and client_secret with configured IdP, and verify it is an OIDC client. ''' logger = logging.getLogger(__name__) authentic_idp = getattr(settings, 'FARGO_IDP_URL', None) if not authentic_idp: logger.warning(u'idp check-password not configured') return False, '' url = urlparse.urljoin(authentic_idp, 'api/check-password/') try: response = requests.post(url, json={ 'username': client_id, 'password': client_secret}, auth=(client_id, client_secret), verify=False) response.raise_for_status() except requests.RequestException as e: logger.warning(u'idp check-password API failed: %s', e) return False, 'idp is down' try: response = response.json() except ValueError as e: logger.warning(u'idp check-password API failed: %s, %r', e, response.content) return False, 'idp is down' if response.get('result') == 0: logger.warning(u'idp check-password API failed') return False, response.get('errors', [''])[0] return True, None def authenticate_credentials(self, client_id, client_secret): try: client = OAuth2Client.objects.get( client_id=client_id, client_secret=client_secret) except OAuth2Client.DoesNotExist: success, error = self.authenticate_through_idp(client_id, client_secret) if not success: raise AuthenticationFailed(error or _('Invalid client_id/client_secret.')) client = OAuth2Client.objects.get(client_id=client_id) client.client_secret = client_secret client.save() user = OAuth2User(client) user.authenticated = True return user, True