hobo/hobo/rest_authentication.py

114 lines
4.1 KiB
Python

import logging
from rest_framework import authentication, exceptions
from hobo import signature
from django.contrib.auth import get_user_model
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.db.models.fields import FieldDoesNotExist
from django.utils.module_loading import import_string
try:
from mellon.models import UserSAMLIdentifier
except ImportError:
UserSAMLIdentifier = None
class AnonymousAuthenticServiceUser(AnonymousUser):
'''This virtual user hold permissions for other publik services'''
is_authenticated = True
is_anonymous = True
def has_perm(self, perm_or_perms, obj=None):
return True
def has_ou_perm(self, perm, ou):
return True
def has_perm_any(self, perm):
return True
def filter_by_perm(self, perm_or_perms, qs):
# all objects are reachable
return qs
def __unicode__(self):
return 'Publik Service User'
class AnonymousAdminServiceUser(AnonymousUser):
'''This virtual user hold permissions for other publik services'''
is_staff = True
is_authenticated = True
is_anonymous = True
def __unicode__(self):
return 'Publik Service Admin'
class PublikAuthentication(authentication.BaseAuthentication):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
super(PublikAuthentication, self).__init__(*args, **kwargs)
def resolve_user(self, request):
User = get_user_model()
if 'NameID' in request.GET:
name_id = request.GET['NameID']
is_authentic = True
try:
User._meta.get_field('uuid')
except FieldDoesNotExist:
is_authentic = False
if is_authentic:
try:
return User.objects.get(uuid=name_id)
except User.DoesNotExist:
raise exceptions.AuthenticationFailed('No user matches uuid=%r' % name_id)
elif UserSAMLIdentifier:
try:
return UserSAMLIdentifier.objects.get(name_id=name_id).user
except UserSAMLIdentifier.DoesNotExist:
raise exceptions.AuthenticationFailed(
'No user matches nameid=%r' % name_id)
else:
raise exceptions.AuthenticationFailed(
'No usable model to match nameid=%r' % name_id)
else:
orig = request.GET['orig']
try:
return User.objects.get(username=orig)
except User.DoesNotExist:
pass
if hasattr(settings, 'HOBO_ANONYMOUS_SERVICE_USER_CLASS'):
klass = import_string(settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS)
self.logger.info('anonymous signature validated')
return klass()
raise exceptions.AuthenticationFailed('Anonymous service user is unsupported')
def get_orig_key(self, orig):
if not hasattr(settings, 'KNOWN_SERVICES'):
self.logger.warning('no known services')
raise exceptions.AuthenticationFailed('No KNOWN_SERVICES setting')
for service_id in settings.KNOWN_SERVICES:
for slug, service in settings.KNOWN_SERVICES[service_id].items():
if service.get('verif_orig') == orig and service.get('secret'):
return service['secret']
self.logger.warning('no secret found for origin %r', orig)
raise exceptions.AuthenticationFailed('no secret found for origin %r' % orig)
def authenticate(self, request):
full_path = request.get_full_path()
if not request.GET.get('orig') or not request.GET.get('signature'):
return None
key = self.get_orig_key(request.GET['orig'])
if not signature.check_url(full_path, key):
self.logger.warning('invalid signature')
raise exceptions.AuthenticationFailed('Invalid signature')
user = self.resolve_user(request)
self.logger.info('user authenticated with signature %s', user)
return (user, None)