128 lines
4.4 KiB
Python
128 lines
4.4 KiB
Python
import logging
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from django.core.exceptions import FieldDoesNotExist
|
|
from django.utils.module_loading import import_string
|
|
from rest_framework import authentication, exceptions, status
|
|
|
|
from hobo import signature
|
|
|
|
try:
|
|
from mellon.models import UserSAMLIdentifier
|
|
except ImportError:
|
|
UserSAMLIdentifier = None
|
|
|
|
|
|
class AnonymousAuthenticServiceUser(AnonymousUser):
|
|
'''This virtual user hold permissions for other publik services'''
|
|
|
|
is_anonymous = True
|
|
is_authenticated = True
|
|
ou = None
|
|
|
|
def has_perm(self, *args, **kwargs):
|
|
return True
|
|
|
|
def has_ou_perm(self, *args, **kwargs):
|
|
return True
|
|
|
|
def has_perm_any(self, *args, **kwargs):
|
|
return True
|
|
|
|
def filter_by_perm(self, perm_or_perms, qs):
|
|
# all objects are reachable
|
|
return qs
|
|
|
|
def __unicode__(self):
|
|
return 'Publik Service User'
|
|
|
|
|
|
class AnonymousAdminServiceUser(AnonymousUser):
|
|
'''This virtual user hold permissions for other publik services'''
|
|
|
|
is_staff = True
|
|
is_anonymous = True
|
|
is_authenticated = True
|
|
ou = None
|
|
|
|
def __unicode__(self):
|
|
return 'Publik Service Admin'
|
|
|
|
|
|
class PublikAuthenticationFailed(exceptions.APIException):
|
|
status_code = status.HTTP_401_UNAUTHORIZED
|
|
default_code = 'invalid-signature'
|
|
|
|
def __init__(self, code):
|
|
self.detail = {'err': 1, 'err_desc': code}
|
|
|
|
|
|
class PublikAuthentication(authentication.BaseAuthentication):
|
|
def __init__(self, *args, **kwargs):
|
|
self.logger = logging.getLogger(__name__)
|
|
super(PublikAuthentication, self).__init__(*args, **kwargs)
|
|
|
|
def resolve_user(self, request):
|
|
User = get_user_model()
|
|
if 'NameID' in request.GET:
|
|
name_id = request.GET['NameID']
|
|
is_authentic = True
|
|
try:
|
|
User._meta.get_field('uuid')
|
|
except FieldDoesNotExist:
|
|
is_authentic = False
|
|
|
|
if is_authentic:
|
|
try:
|
|
return User.objects.get(uuid=name_id)
|
|
except User.DoesNotExist:
|
|
raise PublikAuthenticationFailed('user-not-found')
|
|
|
|
elif UserSAMLIdentifier:
|
|
try:
|
|
return UserSAMLIdentifier.objects.get(name_id=name_id).user
|
|
except UserSAMLIdentifier.DoesNotExist:
|
|
raise PublikAuthenticationFailed('user-not-found')
|
|
else:
|
|
raise PublikAuthenticationFailed('no-usable-model')
|
|
else:
|
|
orig = request.GET['orig']
|
|
try:
|
|
return User.objects.get(username=orig)
|
|
except User.DoesNotExist:
|
|
pass
|
|
if hasattr(settings, 'HOBO_ANONYMOUS_SERVICE_USER_CLASS'):
|
|
klass = import_string(settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS)
|
|
self.logger.info('anonymous signature validated')
|
|
return klass()
|
|
raise PublikAuthenticationFailed('no-user-for-orig')
|
|
|
|
def get_orig_key(self, orig):
|
|
if not hasattr(settings, 'KNOWN_SERVICES'):
|
|
self.logger.warning('no known services')
|
|
raise PublikAuthenticationFailed('no-known-services-setting')
|
|
for service_id in settings.KNOWN_SERVICES:
|
|
for slug, service in settings.KNOWN_SERVICES[service_id].items():
|
|
if service.get('verif_orig') == orig and service.get('secret'):
|
|
return service['secret']
|
|
self.logger.warning('no secret found for origin %r', orig)
|
|
raise PublikAuthenticationFailed('no-secret-found-for-orig')
|
|
|
|
def authenticate(self, request):
|
|
full_path = request.get_full_path()
|
|
if not request.GET.get('orig') or not request.GET.get('signature'):
|
|
return None
|
|
key = self.get_orig_key(request.GET['orig'])
|
|
try:
|
|
assert signature.check_url(
|
|
full_path, key, raise_on_error=True
|
|
), 'signature.check_url should never return False with raise_on_error'
|
|
except signature.SignatureError as e:
|
|
self.logger.warning('publik rest-framework-authentication failed: %s', e)
|
|
raise PublikAuthenticationFailed(str(e))
|
|
user = self.resolve_user(request)
|
|
self.logger.info('user authenticated with signature %s', user)
|
|
return (user, None)
|