authentic/src/authentic2_auth_saml/adapters.py

283 lines
11 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import logging
from django.utils import six
from django.contrib import messages
from django.core.exceptions import MultipleObjectsReturned
from django.db.transaction import atomic
from django.utils.translation import ugettext as _
from mellon.adapters import DefaultAdapter, UserCreationError
from mellon.utils import get_setting
from authentic2 import utils
from authentic2.backends import get_user_queryset
from authentic2.utils.evaluate import evaluate_condition
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
from authentic2.a2_rbac.utils import get_default_ou
logger = logging.getLogger('authentic2.auth_saml')
class MappingError(Exception):
details = None
def __init__(self, message, details=None):
self.details = details or {}
super(MappingError, self).__init__(message)
def __str__(self):
s = six.text_type(self.args[0])
if self.details:
s += ' ' + repr(self.details)
return s
class SamlConditionContextProxy(object):
def __init__(self, saml_attributes):
self.saml_attributes = saml_attributes
def __getitem__(self, key):
if key.endswith('__list'):
return self.saml_attributes[key[:-len('__list')]]
else:
v = self.saml_attributes[key]
if isinstance(v, list):
return v[0] if v else None
else:
return v
class AuthenticAdapter(DefaultAdapter):
def create_user(self, user_class):
user = user_class()
user.set_unusable_password()
user.save()
return user
def finish_create_user(self, idp, saml_attributes, user):
try:
self.provision_a2_attributes(user, idp, saml_attributes)
except MappingError as e:
logger.warning('auth_saml: user creation failed on a mandatory mapping action, %s', e)
if self.request:
messages.error(self.request, _('user creation failed on a mandatory mapping action: %s') % e)
raise UserCreationError
if not user.ou:
user.ou = get_default_ou()
user.save()
def provision(self, user, idp, saml_attributes):
super(AuthenticAdapter, self).provision(user, idp, saml_attributes)
try:
self.provision_a2_attributes(user, idp, saml_attributes)
except MappingError as e:
logger.warning('auth_saml: failure during attribute provisionning %s', e)
@atomic
def provision_a2_attributes(self, user, idp, saml_attributes):
'''Copy incoming SAML attributes to user attributes, A2_ATTRIBUTE_MAPPING must be a list of
dictionaries like:
{
'attribute': 'email',
'saml_attribute': 'email',
# optional:
'mandatory': False,
}
If an attribute is not mandatory any error is just logged, if the attribute is
mandatory, login will fail.
'''
saml_attributes = saml_attributes.copy()
attribute_mapping = get_setting(idp, 'A2_ATTRIBUTE_MAPPING', [])
if not attribute_mapping:
return
if not isinstance(attribute_mapping, list):
raise MappingError('invalid A2_ATTRIBUTE_MAPPING')
if self.apply_attribute_mapping(user, idp, saml_attributes, attribute_mapping):
user.save()
def apply_attribute_mapping(self, user, idp, saml_attributes, attribute_mapping):
user_modified = False
for mapping in attribute_mapping:
if not isinstance(mapping, dict):
raise MappingError('invalid mapping action', details={'mapping': mapping})
action = mapping.get('action', 'set-attribute')
mandatory = mapping.get('mandatory', False) is True
method = None
if isinstance(action, six.string_types):
try:
method = getattr(self, 'action_' + action.replace('-', '_'))
except AttributeError:
pass
try:
if not method:
raise MappingError('invalid action')
logger.debug('auth_saml: applying provisionning mapping %s', mapping)
if method(user, idp, saml_attributes, mapping):
user_modified = True
except MappingError as e:
e.details['mapping'] = mapping
if mandatory:
# it's mandatory, provisionning should fail completely
raise e
else:
logger.warning('auth_saml: mapping action failed: %s', e)
return user_modified
def action_rename(self, user, idp, saml_attributes, mapping):
from_name = mapping.get('from')
if not from_name or not isinstance(from_name, six.string_types):
raise MappingError('missing from in rename')
to_name = mapping.get('to')
if not to_name or not isinstance(to_name, six.string_types):
raise MappingError('missing to in rename')
if from_name in saml_attributes:
saml_attributes[to_name] = saml_attributes[from_name]
def action_set_attribute(self, user, idp, saml_attributes, mapping):
attribute = mapping.get('attribute')
if not attribute or not isinstance(attribute, six.string_types):
raise MappingError('missing attribute key')
saml_attribute = mapping.get('saml_attribute')
if not saml_attribute or not isinstance(saml_attribute, six.string_types):
raise MappingError('missing saml_attribute key')
if saml_attribute not in saml_attributes:
raise MappingError('unknown saml_attribute')
value = saml_attributes[saml_attribute]
return self.set_user_attribute(user, attribute, value)
def set_user_attribute(self, user, attribute, value):
if isinstance(value, list):
if len(value) == 0:
raise MappingError('no value')
if len(value) > 1:
raise MappingError('too many values', details={'value': value})
value = value[0]
if attribute in ('first_name', 'last_name', 'email', 'username'):
if getattr(user, attribute) != value:
logger.info('auth_saml: attribute %r set to %r', attribute, value, extra={'user': user})
setattr(user, attribute, value)
return True
else:
if getattr(user.attributes, attribute) != value:
logger.info('auth_saml: attribute %r set to %r', attribute, value, extra={'user': user})
setattr(user.attributes, attribute, value)
return True
return False
def get_ou(self, role_desc):
ou_desc = role_desc.get('ou')
if ou_desc is None:
return None
if not isinstance(ou_desc, dict):
raise MappingError('invalid ou description')
slug = ou_desc.get('slug')
name = ou_desc.get('name')
if slug:
if not isinstance(slug, six.string_types):
raise MappingError('invalid ou.slug in ou description')
try:
return OU.objects.get(slug=slug)
except OU.DoesNotExist:
raise MappingError('unknown ou', details={'slug': slug})
elif name:
if not isinstance(name, six.string_types):
raise MappingError('invalid ou.slug in ou description')
try:
return OU.objects.get(name=name)
except OU.DoesNotExist:
raise MappingError('unknown ou', details={'name': name})
else:
raise MappingError('invalid ou description')
def get_role(self, mapping):
role_desc = mapping.get('role')
if not role_desc or not isinstance(role_desc, dict):
raise MappingError('missing or invalid role description')
slug = role_desc.get('slug')
name = role_desc.get('name')
ou = self.get_ou(role_desc)
kwargs = {}
if ou:
kwargs['ou'] = ou
if slug:
if not isinstance(slug, six.string_types):
raise MappingError('invalid role slug', details={'slug': slug})
kwargs['slug'] = slug
elif name:
if not isinstance(name, six.string_types):
raise MappingError('invalid role name', details={'name': name})
kwargs['name'] = name
else:
raise MappingError('invalid role description')
try:
return Role.objects.get(**kwargs)
except Role.DoesNotExist:
raise MappingError('unknown role', details=kwargs)
except MultipleObjectsReturned:
raise MappingError('ambiuous role description', details=kwargs)
def evaluate_condition(self, user, saml_attributes, mapping):
condition = mapping.get('condition')
if condition is None:
return True
if not isinstance(condition, six.string_types):
raise MappingError('invalid condition')
try:
# use a proxy to simplify condition expressions as subscript is forbidden
# you can write "email == 'a@b.com'" but also "'a@b.com' in email__list"
value = evaluate_condition(condition, SamlConditionContextProxy(saml_attributes))
logger.debug('auth_saml: condition %r is %s', condition, value, extra={'user': user})
return value
except Exception as e:
raise MappingError('condition evaluation failed', details={'error': six.text_type(e)})
def action_add_role(self, user, idp, saml_attributes, mapping):
role = self.get_role(mapping)
if self.evaluate_condition(user, saml_attributes, mapping):
if role not in user.roles.all():
logger.info('auth_saml: adding role "%s"', role, extra={'user': user})
user.roles.add(role)
else:
if role in user.roles.all():
logger.info('auth_saml: removing role "%s"', role, extra={'user': user})
user.roles.remove(role)
def action_toggle_role(self, *args, **kwargs):
return self.action_add_role(*args, **kwargs)
def auth_login(self, request, user):
utils.login(request, user, 'saml')
def get_users_queryset(self, idp, saml_attributes):
return get_user_queryset()