auth_saml: add more mapping actions in A2_ATTRIBUTE_MAPPING (#35302)

This commit is contained in:
Benjamin Dauvergne 2019-08-08 17:38:37 +02:00
parent 94486a726b
commit 40307f519c
2 changed files with 232 additions and 29 deletions

View File

@ -14,28 +14,74 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import logging
from django.utils import six
from django.core.exceptions import MultipleObjectsReturned
from django.db.transaction import atomic
from mellon.adapters import DefaultAdapter, UserCreationError
from mellon.utils import get_setting
from authentic2 import utils
from authentic2.utils.evaluate import evaluate_condition
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
logger = logging.getLogger('authentic2.auth_saml')
@six.python_2_unicode_compatible
class MappingError(Exception):
details = None
def __init__(self, message, details=None):
if details:
self.details = details
super(MappingError, self).__init__(message)
def __str__(self):
s = six.text_type(self.args[0])
if self.details:
s += ' ' + repr(self.details)
return s
class SamlConditionContextProxy(object):
def __init__(self, saml_attributes):
self.saml_attributes = saml_attributes
def __getitem__(self, key):
if key.endswith('__list'):
return self.saml_attributes[key[:-len('__list')]]
else:
v = self.saml_attributes[key]
if isinstance(v, list):
return v[0] if v else None
else:
return v
class AuthenticAdapter(DefaultAdapter):
def create_user(self, user_class):
return user_class.objects.create()
def finish_create_user(self, idp, saml_attributes, user):
self.provision_a2_attributes(user, idp, saml_attributes, do_raise=True)
try:
self.provision_a2_attributes(user, idp, saml_attributes)
except MappingError as e:
raise UserCreationError('user creation failed on a mandatory mapping action: %s' % e)
def provision(self, user, idp, saml_attributes):
super(AuthenticAdapter, self).provision(user, idp, saml_attributes)
self.provision_a2_attributes(user, idp, saml_attributes)
try:
self.provision_a2_attributes(user, idp, saml_attributes)
except MappingError as e:
logger.warning('auth_saml: failure during attribute provisionning %s', e)
def provision_a2_attributes(self, user, idp, saml_attributes, do_raise=False):
@atomic
def provision_a2_attributes(self, user, idp, saml_attributes):
'''Copy incoming SAML attributes to user attributes, A2_ATTRIBUTE_MAPPING must be a list of
dictionaries like:
@ -50,40 +96,71 @@ class AuthenticAdapter(DefaultAdapter):
mandatory, login will fail.
'''
saml_attributes = saml_attributes.copy()
attribute_mapping = get_setting(idp, 'A2_ATTRIBUTE_MAPPING', [])
if not attribute_mapping:
return
if not isinstance(attribute_mapping, list):
raise MappingError('invalid A2_ATTRIBUTE_MAPPING')
user_modified = False
for mapping in attribute_mapping:
attribute = mapping['attribute']
saml_attribute = mapping['saml_attribute']
mandatory = mapping.get('mandatory', False)
logger.debug('auth_saml: trying mapping attribute from %r to %r', saml_attribute, attribute,
extra={'user': user})
if saml_attribute not in saml_attributes:
if mandatory:
logger.error('auth_saml: mandatory saml attribute %r is missing', saml_attribute,
extra={'attributes': repr(saml_attributes), 'user': user})
if do_raise:
raise UserCreationError('missing saml_attribute %r' % saml_attribute)
else:
logger.debug('auth_saml: saml_attribute %r not found', saml_attribute, extra={'user': user})
continue
if not isinstance(mapping, dict):
raise MappingError('invalid mapping action', details={'mapping': mapping})
action = mapping.get('action', 'set-attribute')
mandatory = mapping.get('mandatory', False) is True
method = None
if isinstance(action, six.string_types):
try:
method = getattr(self, 'action_' + action.replace('-', '_'))
except AttributeError:
pass
if not method:
raise MappingError('invalid action %r' % action)
try:
value = saml_attributes[saml_attribute]
if self.set_user_attribute(user, attribute, value):
logger.debug('auth_saml: applying provisionning mapping %s', mapping)
if method(user, idp, saml_attributes, mapping):
user_modified = True
except Exception as e:
logger.error(u'failed to set attribute %r from saml attribute %r with value %r: %s',
attribute, saml_attribute, value, e,
extra={'attributes': repr(saml_attributes), 'user': user})
if mandatory and do_raise:
raise UserCreationError('could not set attribute %s' % attribute, e)
except MappingError as e:
if mandatory:
# it's mandatory, provisionning should fail completely
raise e
else:
logger.debug('auth_saml: action mapping %r failed: %s', mapping, e)
if user_modified:
user.save()
def action_rename(self, user, idp, saml_attributes, mapping):
from_name = mapping.get('from')
if not from_name or not isinstance(from_name, six.string_types):
raise MappingError('missing from in rename')
to_name = mapping.get('to')
if not to_name or not isinstance(to_name, six.string_types):
raise MappingError('missing to in rename')
if from_name in saml_attributes:
saml_attributes[to_name] = saml_attributes[from_name]
def action_set_attribute(self, user, idp, saml_attributes, mapping):
attribute = mapping.get('attribute')
if not attribute or not isinstance(attribute, six.string_types):
raise MappingError('missing attribute key')
saml_attribute = mapping.get('saml_attribute')
if not saml_attribute or not isinstance(saml_attribute, six.string_types):
raise MappingError('missing saml_attribute key')
if saml_attribute not in saml_attributes:
raise MappingError('unknown saml_attribute', details={'saml_attribute': saml_attribute})
value = saml_attributes[saml_attribute]
return self.set_user_attribute(user, attribute, value)
def set_user_attribute(self, user, attribute, value):
if isinstance(value, list):
if len(value) > 1:
raise ValueError('too much values')
raise MappingError('too much values')
value = value[0]
if attribute in ('first_name', 'last_name', 'email', 'username'):
if getattr(user, attribute) != value:
@ -97,5 +174,86 @@ class AuthenticAdapter(DefaultAdapter):
return True
return False
def get_ou(self, role_desc):
ou_desc = role_desc.get('ou')
if ou_desc is None:
return None
if not isinstance(ou_desc, dict):
raise MappingError('invalid ou description')
slug = ou_desc.get('slug')
name = ou_desc.get('name')
if slug:
if not isinstance(slug, six.string_types):
raise MappingError('invalid ou.slug in ou description')
try:
return OU.objects.get(slug=slug)
except OU.DoesNotExist:
raise MappingError('unknown ou', details={'slug': slug})
elif name:
if not isinstance(name, six.string_types):
raise MappingError('invalid ou.slug in ou description')
try:
return OU.objects.get(name=name)
except OU.DoesNotExist:
raise MappingError('unknown ou', details={'name': name})
else:
raise MappingError('invalid ou description')
def get_role(self, mapping):
role_desc = mapping.get('role')
if not role_desc or not isinstance(role_desc, dict):
raise MappingError('missing or invalid role description')
slug = role_desc.get('slug')
name = role_desc.get('name')
ou = self.get_ou(role_desc)
kwargs = {}
if ou:
kwargs['ou'] = ou
if slug:
if not isinstance(slug, six.string_types):
raise MappingError('invalid role slug', details={'slug': slug})
kwargs['slug'] = slug
elif name:
if not isinstance(name, six.string_types):
raise MappingError('invalid role name', details={'name': name})
kwargs['name'] = name
else:
raise MappingError('invalid role description')
try:
return Role.objects.get(**kwargs)
except Role.DoesNotExist:
raise MappingError('unknown role', details=kwargs)
except MultipleObjectsReturned:
raise MappingError('ambiuous role description', details=kwargs)
def evaluate_condition(self, user, saml_attributes, mapping):
condition = mapping.get('condition')
if condition is None:
return True
if not isinstance(condition, six.string_types):
raise MappingError('invalid condition')
try:
# use a proxy to simplify condition expressions as subscript is forbidden
# you can write "email == 'a@b.com'" but also "'a@b.com' in email__list"
value = evaluate_condition(condition, SamlConditionContextProxy(saml_attributes))
logger.debug('auth_saml: condition %r is %s', condition, value, extra={'user': user})
return value
except Exception as e:
raise MappingError('condition evaluation failed', details={'error': six.text_type(e)})
def action_toggle_role(self, user, idp, saml_attributes, mapping):
role = self.get_role(mapping)
if self.evaluate_condition(user, saml_attributes, mapping):
if role not in user.roles.all():
logger.info('auth_saml: adding role "%s"', role, extra={'user': user})
user.roles.add(role)
else:
if role in user.roles.all():
logger.info('auth_saml: removing role "%s"', role, extra={'user': user})
user.roles.remove(role)
def auth_login(self, request, user):
utils.login(request, user, 'saml')

View File

@ -24,7 +24,7 @@ from django.contrib.auth import get_user_model
from authentic2.models import Attribute
def test_provision_attributes(db, caplog):
def test_provision_attributes(db, caplog, simple_role):
from authentic2_auth_saml.adapters import AuthenticAdapter
adapter = AuthenticAdapter()
@ -39,10 +39,29 @@ def test_provision_attributes(db, caplog):
'saml_attribute': 'mail',
'mandatory': True,
},
{
'action': 'rename',
'from': 'http://fucking/attribute/givenName',
'to': 'first_name'
},
{
'attribute': 'title',
'saml_attribute': 'title',
},
{
'attribute': 'first_name',
'saml_attribute': 'first_name',
},
{
'action': 'toggle-role',
'role': {
'name': simple_role.name,
'ou': {
'name': simple_role.ou.name,
},
},
'condition': "roles == 'A'",
}
]
}
@ -50,13 +69,39 @@ def test_provision_attributes(db, caplog):
u'issuer': 'https://idp.com/',
u'name_id_content': 'xxx',
u'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
u'mail': u'john.doe@example.com',
u'title': u'Mr.',
u'mail': [u'john.doe@example.com'],
u'title': [u'Mr.'],
u'http://fucking/attribute/givenName': ['John'],
}
user = adapter.lookup_user(idp, saml_attributes)
user.refresh_from_db()
assert user.email == 'john.doe@example.com'
assert user.attributes.title == 'Mr.'
assert user.first_name == 'John'
assert simple_role not in user.roles.all()
user.delete()
# if a toggle-role is mandatory, failure to evaluate condition block user creation
assert idp['A2_ATTRIBUTE_MAPPING'][-1]['action'] == 'toggle-role'
idp['A2_ATTRIBUTE_MAPPING'][-1]['mandatory'] = True
assert adapter.lookup_user(idp, saml_attributes) is None
saml_attributes['roles'] = ['A']
user = adapter.lookup_user(idp, saml_attributes)
user.refresh_from_db()
assert simple_role in user.roles.all()
user.delete()
idp['A2_ATTRIBUTE_MAPPING'][-1]['condition'] = "'A' in roles__list"
user = adapter.lookup_user(idp, saml_attributes)
user.refresh_from_db()
assert simple_role in user.roles.all()
saml_attributes['roles'] = []
adapter.provision(user, idp, saml_attributes)
# condition failed, so role should be removed
assert simple_role not in user.roles.all()
user.delete()
# on missing mandatory attribute, no user is created