115 lines
4.1 KiB
Python
115 lines
4.1 KiB
Python
import logging
|
|
|
|
from rest_framework import authentication, exceptions
|
|
|
|
from hobo import signature
|
|
|
|
from django.contrib.auth import get_user_model
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from django.db.models.fields import FieldDoesNotExist
|
|
from django.utils.module_loading import import_string
|
|
|
|
try:
|
|
from mellon.models import UserSAMLIdentifier
|
|
except ImportError:
|
|
UserSAMLIdentifier = None
|
|
|
|
|
|
class AnonymousAuthenticServiceUser(AnonymousUser):
|
|
'''This virtual user hold permissions for other publik services'''
|
|
def is_authenticated(self):
|
|
return True
|
|
|
|
def has_perm(self, perm_or_perms, obj=None):
|
|
return True
|
|
|
|
def has_ou_perm(self, perm, ou):
|
|
return True
|
|
|
|
def has_perm_any(self, perm):
|
|
return True
|
|
|
|
def filter_by_perm(self, perm_or_perms, qs):
|
|
# all objects are reachable
|
|
return qs
|
|
|
|
def __unicode__(self):
|
|
return 'Publik Service User'
|
|
|
|
|
|
class AnonymousAdminServiceUser(AnonymousUser):
|
|
'''This virtual user hold permissions for other publik services'''
|
|
is_staff = True
|
|
|
|
def is_authenticated(self):
|
|
return True
|
|
|
|
def __unicode__(self):
|
|
return 'Publik Service Admin'
|
|
|
|
|
|
class PublikAuthentication(authentication.BaseAuthentication):
|
|
def __init__(self, *args, **kwargs):
|
|
self.logger = logging.getLogger(__name__)
|
|
super(PublikAuthentication, self).__init__(*args, **kwargs)
|
|
|
|
def resolve_user(self, request):
|
|
User = get_user_model()
|
|
if 'NameID' in request.GET:
|
|
name_id = request.GET['NameID']
|
|
is_authentic = True
|
|
try:
|
|
User._meta.get_field('uuid')
|
|
except FieldDoesNotExist:
|
|
is_authentic = False
|
|
|
|
if is_authentic:
|
|
try:
|
|
return User.objects.get(uuid=name_id)
|
|
except User.DoesNotExist:
|
|
raise exceptions.AuthenticationFailed('No user matches uuid=%r' % name_id)
|
|
elif UserSAMLIdentifier:
|
|
try:
|
|
return UserSAMLIdentifier.objects.get(name_id=name_id).user
|
|
except UserSAMLIdentifier.DoesNotExist:
|
|
raise exceptions.AuthenticationFailed(
|
|
'No user matches nameid=%r' % name_id)
|
|
else:
|
|
raise exceptions.AuthenticationFailed(
|
|
'No usable model to match nameid=%r' % name_id)
|
|
else:
|
|
orig = request.GET['orig']
|
|
try:
|
|
return User.objects.get(username=orig)
|
|
except User.DoesNotExist:
|
|
pass
|
|
if hasattr(settings, 'HOBO_ANONYMOUS_SERVICE_USER_CLASS'):
|
|
klass = import_string(settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS)
|
|
self.logger.info('anonymous signature validated')
|
|
return klass()
|
|
raise exceptions.AuthenticationFailed('Anonymous service user is unsupported')
|
|
|
|
def get_orig_key(self, orig):
|
|
if not hasattr(settings, 'KNOWN_SERVICES'):
|
|
self.logger.warning('no known services')
|
|
raise exceptions.AuthenticationFailed('No KNOWN_SERVICES setting')
|
|
for service_id in settings.KNOWN_SERVICES:
|
|
for slug, service in settings.KNOWN_SERVICES[service_id].items():
|
|
if service.get('verif_orig') == orig and service.get('secret'):
|
|
return service['secret']
|
|
self.logger.warning('no secret found for origin %r', orig)
|
|
raise exceptions.AuthenticationFailed('no secret found for origin %r' % orig)
|
|
|
|
def authenticate(self, request):
|
|
full_path = request.get_full_path()
|
|
if not request.GET.get('orig') or not request.GET.get('signature'):
|
|
return None
|
|
key = self.get_orig_key(request.GET['orig'])
|
|
if not signature.check_url(full_path, key):
|
|
self.logger.warning('invalid signature')
|
|
raise exceptions.AuthenticationFailed('Invalid signature')
|
|
user = self.resolve_user(request)
|
|
self.logger.info('user authenticated with signature %s', user)
|
|
return (user, None)
|