import logging import uuid from xml.etree import ElementTree as ET import lasso import requests import requests.exceptions from django.core.exceptions import PermissionDenied from django.contrib import auth from django.contrib.auth.models import Group from django.utils import six from django.utils.encoding import force_text from . import utils, app_settings, models User = auth.get_user_model() class UserCreationError(Exception): pass class DefaultAdapter(object): def __init__(self, *args, **kwargs): self.logger = logging.getLogger(__name__) def get_idp(self, entity_id): '''Find the first IdP definition matching entity_id''' for idp in self.get_idps(): if entity_id == idp['ENTITY_ID']: return idp def get_identity_providers_setting(self): return app_settings.IDENTITY_PROVIDERS def get_idps(self): for i, idp in enumerate(self.get_identity_providers_setting()): if 'METADATA_URL' in idp and 'METADATA' not in idp: verify_ssl_certificate = utils.get_setting( idp, 'VERIFY_SSL_CERTIFICATE') try: response = requests.get(idp['METADATA_URL'], verify=verify_ssl_certificate) response.raise_for_status() except requests.exceptions.RequestException as e: self.logger.error( u'retrieval of metadata URL %r failed with error %s for %d-th idp', idp['METADATA_URL'], e, i) continue idp['METADATA'] = response.text elif 'METADATA' in idp: if idp['METADATA'].startswith('/'): idp['METADATA'] = open(idp['METADATA']).read() else: self.logger.error(u'missing METADATA or METADATA_URL in %d-th idp', i) continue if 'ENTITY_ID' not in idp: try: doc = ET.fromstring(idp['METADATA']) except (TypeError, ET.ParseError): self.logger.error(u'METADATA of %d-th idp is invalid', i) continue if doc.tag != '{%s}EntityDescriptor' % lasso.SAML2_METADATA_HREF: self.logger.error(u'METADATA of %d-th idp has no EntityDescriptor root tag', i) continue if not 'entityID' in doc.attrib: self.logger.error( u'METADATA of %d-th idp has no entityID attribute on its root tag', i) continue idp['ENTITY_ID'] = doc.attrib['entityID'] yield idp def authorize(self, idp, saml_attributes): if not idp: return False required_classref = utils.get_setting(idp, 'AUTHN_CLASSREF') if required_classref: given_classref = saml_attributes['authn_context_class_ref'] if given_classref is None or \ given_classref not in required_classref: raise PermissionDenied return True def format_username(self, idp, saml_attributes): realm = utils.get_setting(idp, 'REALM') username_template = utils.get_setting(idp, 'USERNAME_TEMPLATE') try: username = force_text(username_template).format( realm=realm, attributes=saml_attributes, idp=idp)[:30] except ValueError: self.logger.error(u'invalid username template %r', username_template) except (AttributeError, KeyError, IndexError) as e: self.logger.error( u'invalid reference in username template %r: %s', username_template, e) except Exception as e: self.logger.exception(u'unknown error when formatting username') else: return username def create_user(self, user_class): return user_class.objects.create(username=uuid.uuid4().hex[:30]) def finish_create_user(self, idp, saml_attributes, user): username = self.format_username(idp, saml_attributes) if not username: self.logger.warning('could not build a username, login refused') raise UserCreationError user.username = username def lookup_user(self, idp, saml_attributes): transient_federation_attribute = utils.get_setting(idp, 'TRANSIENT_FEDERATION_ATTRIBUTE') if saml_attributes['name_id_format'] == lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT: if (transient_federation_attribute and saml_attributes.get(transient_federation_attribute)): name_id = saml_attributes[transient_federation_attribute] if not isinstance(name_id, six.string_types): if len(name_id) == 1: name_id = name_id[0] else: self.logger.warning('more than one value for attribute %r, cannot federate', transient_federation_attribute) return None else: return None else: name_id = saml_attributes['name_id_content'] issuer = saml_attributes['issuer'] try: return User.objects.get(saml_identifiers__name_id=name_id, saml_identifiers__issuer=issuer) except User.DoesNotExist: pass if not utils.get_setting(idp, 'PROVISION'): self.logger.warning('provisionning disabled, login refused') return None user = self.create_user(User) nameid_user = self._link_user(idp, saml_attributes, issuer, name_id, user) if user != nameid_user:'looked up user %s with name_id %s from issuer %s', nameid_user, name_id, issuer) user.delete() return nameid_user try: self.finish_create_user(idp, saml_attributes, nameid_user) except UserCreationError: nameid_user.delete() return None'created new user %s with name_id %s from issuer %s', nameid_user, name_id, issuer) return nameid_user def _link_user(self, idp, saml_attributes, issuer, name_id, user): saml_id, created = models.UserSAMLIdentifier.objects.get_or_create( name_id=name_id, issuer=issuer, defaults={'user': user}) if created: return user else: return saml_id.user def provision(self, user, idp, saml_attributes): self.provision_attribute(user, idp, saml_attributes) self.provision_superuser(user, idp, saml_attributes) self.provision_groups(user, idp, saml_attributes) def provision_attribute(self, user, idp, saml_attributes): realm = utils.get_setting(idp, 'REALM') attribute_mapping = utils.get_setting(idp, 'ATTRIBUTE_MAPPING') attribute_set = False for field, tpl in attribute_mapping.items(): try: value = force_text(tpl).format(realm=realm, attributes=saml_attributes, idp=idp) except ValueError: self.logger.warning(u'invalid attribute mapping template %r', tpl) except (AttributeError, KeyError, IndexError, ValueError) as e: self.logger.warning( u'invalid reference in attribute mapping template %r: %s', tpl, e) else: model_field = user._meta.get_field(field) if hasattr(model_field, 'max_length'): value = value[:model_field.max_length] if getattr(user, field) != value: old_value = getattr(user, field) setattr(user, field, value) attribute_set = True'set field %s of user %s to value %r (old value %r)', field, user, value, old_value) if attribute_set: def provision_superuser(self, user, idp, saml_attributes): superuser_mapping = utils.get_setting(idp, 'SUPERUSER_MAPPING') if not superuser_mapping: return attribute_set = False for key, values in superuser_mapping.items(): if key in saml_attributes: if not isinstance(values, (tuple, list)): values = [values] values = set(values) attribute_values = saml_attributes[key] if not isinstance(attribute_values, (tuple, list)): attribute_values = [attribute_values] attribute_values = set(attribute_values) if attribute_values & values: if not (user.is_staff and user.is_superuser): user.is_staff = True user.is_superuser = True attribute_set = True'flag is_staff and is_superuser added to user %s', user) break else: if user.is_superuser or user.is_staff: user.is_staff = False user.is_superuser = False'flag is_staff and is_superuser removed from user %s', user) attribute_set = True if attribute_set: def provision_groups(self, user, idp, saml_attributes): group_attribute = utils.get_setting(idp, 'GROUP_ATTRIBUTE') create_group = utils.get_setting(idp, 'CREATE_GROUP') if group_attribute in saml_attributes: values = saml_attributes[group_attribute] if not isinstance(values, (list, tuple)): values = [values] groups = [] for value in set(values): if create_group: group, created = Group.objects.get_or_create(name=value) else: try: group = Group.objects.get(name=value) except Group.DoesNotExist: continue groups.append(group) for group in Group.objects.filter(pk__in=[ for g in groups]).exclude(user=user): u'adding group %s (%s) to user %s (%s)', group,, user, User.groups.through.objects.get_or_create(group=group, user=user) qs = User.groups.through.objects.exclude( group__pk__in=[ for g in groups]).filter(user=user) for rel in qs:'removing group %s (%s) from user %s (%s)',,, rel.user, qs.delete()