trivial: apply black (#51575)

This commit is contained in:
Benjamin Dauvergne 2021-03-02 14:50:18 +01:00
parent 672cfb90a4
commit 5b9bc1ff57
20 changed files with 554 additions and 369 deletions

View File

@ -104,7 +104,9 @@ class DefaultAdapter(object):
logger.error('invalid metadata file retrieved from %s', path)
return
if 'ENTITY_ID' in idp and idp['ENTITY_ID'] != entity_id:
logger.error('metadata path %s : entityID changed %r != %r', path, entity_id, idp['ENTITY_ID'])
logger.error(
'metadata path %s : entityID changed %r != %r', path, entity_id, idp['ENTITY_ID']
)
del idp['ENTITY_ID']
idp['METADATA'] = metadata
@ -160,8 +162,7 @@ class DefaultAdapter(object):
def __http_get():
try:
verify_ssl_certificate = utils.get_setting(
idp, 'VERIFY_SSL_CERTIFICATE')
verify_ssl_certificate = utils.get_setting(idp, 'VERIFY_SSL_CERTIFICATE')
try:
response = requests.get(url, verify=verify_ssl_certificate, timeout=timeout)
response.raise_for_status()
@ -176,7 +177,9 @@ class DefaultAdapter(object):
if 'ENTITY_ID' in idp and idp['ENTITY_ID'] != entity_id:
# entityID change is always en error
logger.error('metadata url %s : entityID changed %r != %r', url, entity_id, idp['ENTITY_ID'])
logger.error(
'metadata url %s : entityID changed %r != %r', url, entity_id, idp['ENTITY_ID']
)
del idp['ENTITY_ID']
idp['METADATA'] = response.text
@ -186,7 +189,9 @@ class DefaultAdapter(object):
with atomic_write(file_cache_path, mode='wb', overwrite=True) as fd:
fd.write(response.text.encode('utf-8'))
except OSError as e:
logger.error('metadata url %s : could not write file cache %s, %s', url, file_cache_path, e)
logger.error(
'metadata url %s : could not write file cache %s, %s', url, file_cache_path, e
)
idp['METADATA_PATH'] = file_cache_path
# prevent reloading of the file cache immediately
idp['METADATA_PATH_LAST_UPDATE'] = time.time() + 1
@ -198,8 +203,9 @@ class DefaultAdapter(object):
if metadata_cache_time:
stale_timeout = 24 * metadata_cache_time
if last_update and (now - idp['METADATA_URL_LAST_UPDATE']) > stale_timeout:
logger.error('metadata url %s: not updated since %.1f hours',
url, stale_timeout / 3600.0)
logger.error(
'metadata url %s: not updated since %.1f hours', url, stale_timeout / 3600.0
)
# we have cache, update in background
if last_update and 'METADATA' in idp:
@ -245,8 +251,7 @@ class DefaultAdapter(object):
return None
if 'entityID' not in doc.attrib:
logger.error(
'METADATA of %d-th idp has no entityID attribute on its root tag', i)
logger.error('METADATA of %d-th idp has no entityID attribute on its root tag', i)
return None
return doc.attrib['entityID']
@ -260,8 +265,7 @@ class DefaultAdapter(object):
required_classref = utils.get_setting(idp, 'AUTHN_CLASSREF')
if required_classref:
given_classref = saml_attributes['authn_context_class_ref']
if given_classref is None or \
given_classref not in required_classref:
if given_classref is None or given_classref not in required_classref:
raise PermissionDenied
return True
@ -269,13 +273,13 @@ class DefaultAdapter(object):
realm = utils.get_setting(idp, 'REALM')
username_template = utils.get_setting(idp, 'USERNAME_TEMPLATE')
try:
username = force_text(username_template).format(
realm=realm, attributes=saml_attributes, idp=idp)[:30]
username = force_text(username_template).format(realm=realm, attributes=saml_attributes, idp=idp)[
:30
]
except ValueError:
logger.error('invalid username template %r', username_template)
except (AttributeError, KeyError, IndexError) as e:
logger.error(
'invalid reference in username template %r: %s', username_template, e)
logger.error('invalid reference in username template %r: %s', username_template, e)
except Exception:
logger.exception('unknown error when formatting username')
else:
@ -295,19 +299,23 @@ class DefaultAdapter(object):
def lookup_user(self, idp, saml_attributes):
transient_federation_attribute = utils.get_setting(idp, 'TRANSIENT_FEDERATION_ATTRIBUTE')
if saml_attributes['name_id_format'] == lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT:
if (transient_federation_attribute
and saml_attributes.get(transient_federation_attribute)):
if transient_federation_attribute and saml_attributes.get(transient_federation_attribute):
name_id = saml_attributes[transient_federation_attribute]
if not isinstance(name_id, six.string_types):
if len(name_id) == 1:
name_id = name_id[0]
else:
logger.warning('more than one value for attribute %r, cannot federate',
transient_federation_attribute)
logger.warning(
'more than one value for attribute %r, cannot federate',
transient_federation_attribute,
)
return None
else:
if self.request:
messages.warning(self.request, _('A transient NameID was received but TRANSIENT_FEDERATION_ATTRIBUTE is not set.'))
messages.warning(
self.request,
_('A transient NameID was received but TRANSIENT_FEDERATION_ATTRIBUTE is not set.'),
)
logger.warning('transient NameID was received but TRANSIENT_FEDERATION_ATTRIBUTE is not set')
return None
else:
@ -315,8 +323,8 @@ class DefaultAdapter(object):
issuer = saml_attributes['issuer']
try:
saml_identifier = models.UserSAMLIdentifier.objects.select_related('user').get(
name_id=name_id,
issuer=issuer)
name_id=name_id, issuer=issuer
)
user = saml_identifier.user
user.saml_identifier = saml_identifier
logger.info('looked up user %s with name_id %s from issuer %s', user, name_id, issuer)
@ -358,13 +366,17 @@ class DefaultAdapter(object):
def _lookup_by_attributes(self, idp, saml_attributes, lookup_by_attributes):
if not isinstance(lookup_by_attributes, list):
logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes)
logger.error(
'invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes
)
return None
users = set()
for line in lookup_by_attributes:
if not isinstance(line, dict):
logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line)
logger.error(
'invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line
)
continue
user_field = line.get('user_field')
if not hasattr(user_field, 'isalpha'):
@ -373,8 +385,11 @@ class DefaultAdapter(object):
try:
User._meta.get_field(user_field)
except FieldDoesNotExist:
logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist',
line, user_field)
logger.error(
'invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist',
line,
user_field,
)
continue
saml_attribute = line.get('saml_attribute')
if not hasattr(saml_attribute, 'isalpha'):
@ -384,7 +399,9 @@ class DefaultAdapter(object):
if not values:
logger.warning(
'looking for user by saml attribute %r and user field %r, skipping because empty',
saml_attribute, user_field)
saml_attribute,
user_field,
)
continue
ignore_case = line.get('ignore-case', False)
for value in values:
@ -392,26 +409,40 @@ class DefaultAdapter(object):
if ignore_case:
key += '__iexact'
users_found = self.get_users_queryset(idp, saml_attributes).filter(
saml_identifiers__isnull=True, **{key: value})
saml_identifiers__isnull=True, **{key: value}
)
if not users_found:
logger.debug('looking for users by attribute %r and user field %r with value %r: not found',
saml_attribute, user_field, value)
logger.debug(
'looking for users by attribute %r and user field %r with value %r: not found',
saml_attribute,
user_field,
value,
)
continue
logger.info('looking for user by attribute %r and user field %r with value %r: found %s',
saml_attribute, user_field, value, display_truncated_list(users_found))
logger.info(
'looking for user by attribute %r and user field %r with value %r: found %s',
saml_attribute,
user_field,
value,
display_truncated_list(users_found),
)
users.update(users_found)
if len(users) == 1:
user = list(users)[0]
logger.info('looking for user by attributes %r: found user %s', lookup_by_attributes, user)
return user
elif len(users) > 1:
logger.warning('looking for user by attributes %r: too many users found(%d), failing',
lookup_by_attributes, len(users))
logger.warning(
'looking for user by attributes %r: too many users found(%d), failing',
lookup_by_attributes,
len(users),
)
return None
def _link_user(self, idp, saml_attributes, issuer, name_id, user):
saml_id, created = models.UserSAMLIdentifier.objects.get_or_create(
name_id=name_id, issuer=issuer, defaults={'user': user})
name_id=name_id, issuer=issuer, defaults={'user': user}
)
if created:
user.saml_identifier = saml_id
return user
@ -434,17 +465,18 @@ class DefaultAdapter(object):
except ValueError:
logger.warning('invalid attribute mapping template %r', tpl)
except (AttributeError, KeyError, IndexError, ValueError) as e:
logger.warning(
'invalid reference in attribute mapping template %r: %s', tpl, e)
logger.warning('invalid reference in attribute mapping template %r: %s', tpl, e)
else:
model_field = user._meta.get_field(field)
if hasattr(model_field, 'max_length'):
value = value[:model_field.max_length]
value = value[: model_field.max_length]
if getattr(user, field) != value:
old_value = getattr(user, field)
setattr(user, field, value)
attribute_set = True
logger.info('set field %s of user %s to value %r (old value %r)', field, user, value, old_value)
logger.info(
'set field %s of user %s to value %r (old value %r)', field, user, value, old_value
)
if attribute_set:
user.save()
@ -496,11 +528,11 @@ class DefaultAdapter(object):
continue
groups.append(group)
for group in Group.objects.filter(pk__in=[g.pk for g in groups]).exclude(user=user):
logger.info(
'adding group %s (%s) to user %s (%s)', group, group.pk, user, user.pk)
logger.info('adding group %s (%s) to user %s (%s)', group, group.pk, user, user.pk)
User.groups.through.objects.get_or_create(group=group, user=user)
qs = User.groups.through.objects.exclude(
group__pk__in=[g.pk for g in groups]).filter(user=user)
qs = User.groups.through.objects.exclude(group__pk__in=[g.pk for g in groups]).filter(user=user)
for rel in qs:
logger.info('removing group %s (%s) from user %s (%s)', rel.group, rel.group.pk, rel.user, rel.user.pk)
logger.info(
'removing group %s (%s) from user %s (%s)', rel.group, rel.group.pk, rel.user, rel.user.pk
)
qs.delete()

View File

@ -15,9 +15,7 @@ class AppSettings(object):
'NAME_ID_POLICY_ALLOW_CREATE': True,
'FORCE_AUTHN': False,
'ADD_AUTHNREQUEST_NEXT_URL_EXTENSION': False,
'ADAPTER': (
'mellon.adapters.DefaultAdapter',
),
'ADAPTER': ('mellon.adapters.DefaultAdapter',),
'REALM': 'saml',
'PROVISION': True,
'USERNAME_TEMPLATE': '{attributes[name_id_content]}@{realm}',
@ -48,6 +46,7 @@ class AppSettings(object):
@property
def IDENTITY_PROVIDERS(self):
from django.conf import settings
try:
idps = settings.MELLON_IDENTITY_PROVIDERS
except AttributeError:
@ -58,10 +57,12 @@ class AppSettings(object):
def __getattr__(self, name):
from django.conf import settings
if name not in self.__DEFAULTS:
raise AttributeError
return getattr(settings, self.__PREFIX + name, self.__DEFAULTS[name])
app_settings = AppSettings()
app_settings.__name__ = __name__
sys.modules[__name__] = app_settings

View File

@ -28,9 +28,11 @@ PASSIVE_TRIED_COOKIE = 'MELLON_PASSIVE_TRIED'
class PassiveAuthenticationMiddleware(MiddlewareMixin):
def process_response(self, request, response):
# When unlogged remove the PASSIVE_TRIED cookie
if app_settings.OPENED_SESSION_COOKIE_NAME \
and PASSIVE_TRIED_COOKIE in request.COOKIES \
and app_settings.OPENED_SESSION_COOKIE_NAME not in request.COOKIES:
if (
app_settings.OPENED_SESSION_COOKIE_NAME
and PASSIVE_TRIED_COOKIE in request.COOKIES
and app_settings.OPENED_SESSION_COOKIE_NAME not in request.COOKIES
):
response.delete_cookie(PASSIVE_TRIED_COOKIE)
return response
@ -39,7 +41,9 @@ class PassiveAuthenticationMiddleware(MiddlewareMixin):
if request.is_ajax():
return
# Skip AJAX and media/script requests, unless mellon_no_passive is False on the view
if getattr(view_func, 'mellon_no_passive', True) and 'text/html' not in request.META.get('HTTP_ACCEPT', ''):
if getattr(view_func, 'mellon_no_passive', True) and 'text/html' not in request.META.get(
'HTTP_ACCEPT', ''
):
return
# Skip views asking to be skiped
if getattr(view_func, 'mellon_no_passive', False):

View File

@ -15,11 +15,22 @@ class Migration(migrations.Migration):
migrations.CreateModel(
name='UserSAMLIdentifier',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
(
'id',
models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True),
),
('issuer', models.TextField(verbose_name='Issuer')),
('name_id', models.TextField(verbose_name='SAML identifier')),
('created', models.DateTimeField(auto_now_add=True, verbose_name='created')),
('user', models.ForeignKey(related_name='saml_identifiers', verbose_name='user', to=settings.AUTH_USER_MODEL, on_delete=models.CASCADE)),
(
'user',
models.ForeignKey(
related_name='saml_identifiers',
verbose_name='user',
to=settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
),
),
],
options={
'verbose_name': 'user SAML identifier',

View File

@ -14,10 +14,20 @@ class Migration(migrations.Migration):
migrations.CreateModel(
name='SessionIndex',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('session_index', models.TextField(verbose_name='SAML SessionIndex')),
('session_key', models.CharField(max_length=40, verbose_name='Django session key')),
('saml_identifier', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='mellon.UserSAMLIdentifier', verbose_name='SAML identifier')),
(
'saml_identifier',
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to='mellon.UserSAMLIdentifier',
verbose_name='SAML identifier',
),
),
],
options={
'verbose_name': 'SAML SessionIndex',

View File

@ -27,14 +27,11 @@ class UserSAMLIdentifier(models.Model):
verbose_name=_('user'),
to=settings.AUTH_USER_MODEL,
related_name='saml_identifiers',
on_delete=models.CASCADE)
issuer = models.TextField(
verbose_name=_('Issuer'))
name_id = models.TextField(
verbose_name=_('SAML identifier'))
created = models.DateTimeField(
verbose_name=_('created'),
auto_now_add=True)
on_delete=models.CASCADE,
)
issuer = models.TextField(verbose_name=_('Issuer'))
name_id = models.TextField(verbose_name=_('SAML identifier'))
created = models.DateTimeField(verbose_name=_('created'), auto_now_add=True)
class Meta:
verbose_name = _('user SAML identifier')
@ -46,9 +43,8 @@ class SessionIndex(models.Model):
session_index = models.TextField(_('SAML SessionIndex'))
session_key = models.CharField(_('Django session key'), max_length=40)
saml_identifier = models.ForeignKey(
verbose_name=_('SAML identifier'),
to=UserSAMLIdentifier,
on_delete=models.CASCADE)
verbose_name=_('SAML identifier'), to=UserSAMLIdentifier, on_delete=models.CASCADE
)
@staticmethod
def cleanup(cls):
@ -64,6 +60,4 @@ class SessionIndex(models.Model):
class Meta:
verbose_name = _('SAML SessionIndex')
verbose_name_plural = _('SAML SessionIndexes')
unique_together = (
('saml_identifier', 'session_index', 'session_key'),
)
unique_together = (('saml_identifier', 'session_index', 'session_key'),)

View File

@ -7,10 +7,7 @@ from . import views
urlpatterns = [
url('login/$', views.login,
name='mellon_login'),
url('logout/$', views.logout,
name='mellon_logout'),
url('metadata/$', views.metadata,
name='mellon_metadata')
url('login/$', views.login, name='mellon_login'),
url('logout/$', views.logout, name='mellon_logout'),
url('metadata/$', views.metadata, name='mellon_metadata'),
]

View File

@ -35,6 +35,7 @@ from . import app_settings
logger = logging.getLogger(__name__)
def create_metadata(request):
entity_id = reverse('mellon_metadata')
login_url = reverse(app_settings.LOGIN_URL)
@ -59,8 +60,7 @@ def create_metadata(request):
'contact_persons': app_settings.CONTACT_PERSONS,
}
if app_settings.METADATA_PUBLISH_DISCOVERY_RESPONSE:
ctx['discovery_endpoint_url'] = request.build_absolute_uri(
reverse('mellon_login'))
ctx['discovery_endpoint_url'] = request.build_absolute_uri(reverse('mellon_login'))
return render_to_string('mellon/metadata.xml', ctx)
@ -81,8 +81,9 @@ def create_server(request):
else: # no signature
private_key = None
private_key_password = None
server = lasso.Server.newFromBuffers(metadata, private_key_content=private_key,
private_key_password=private_key_password)
server = lasso.Server.newFromBuffers(
metadata, private_key_content=private_key, private_key_password=private_key_password
)
if app_settings.SIGNATURE_METHOD:
symbol_name = 'SIGNATURE_METHOD_' + app_settings.SIGNATURE_METHOD.replace('-', '_').upper()
if hasattr(lasso, symbol_name):
@ -145,10 +146,10 @@ def flatten_datetime(d):
def iso8601_to_datetime(date_string, default=None):
'''Convert a string formatted as an ISO8601 date into a datetime
value.
"""Convert a string formatted as an ISO8601 date into a datetime
value.
This function ignores the sub-second resolution'''
This function ignores the sub-second resolution"""
try:
dt = isodate.parse_datetime(date_string)
except Exception:
@ -170,6 +171,7 @@ def to_list(func):
@wraps(func)
def f(*args, **kwargs):
return list(func(*args, **kwargs))
return f
@ -198,9 +200,9 @@ def get_values(saml_attributes, name):
def get_setting(idp, name, default=None):
'''Get a parameter from an IdP specific configuration or from the main
settings.
'''
"""Get a parameter from an IdP specific configuration or from the main
settings.
"""
return idp.get(name) or getattr(app_settings, name, default)
@ -212,14 +214,16 @@ def make_session_dump(lasso_name_id, indexes):
sp_name_qualifier = lasso_name_id.spNameQualifier and force_text(lasso_name_id.spNameQualifier)
for index in indexes:
issuer = index.saml_identifier.issuer
session_infos.append({
'entity_id': issuer,
'session_index': index.session_index,
'name_id_content': name_id,
'name_id_format': name_id_format,
'name_id_name_qualifier': name_qualifier,
'name_id_sp_name_qualifier': sp_name_qualifier,
})
session_infos.append(
{
'entity_id': issuer,
'session_index': index.session_index,
'name_id_content': name_id,
'name_id_format': name_id_format,
'name_id_name_qualifier': name_qualifier,
'name_id_sp_name_qualifier': sp_name_qualifier,
}
)
session_dump = render_to_string('mellon/session_dump.xml', {'session_infos': session_infos})
return session_dump
@ -286,6 +290,7 @@ def get_xml_encoding(content):
if encoding:
xml_encoding = encoding
parser = expat.ParserCreate()
parser.XmlDeclHandler = xmlDeclHandler
try:
@ -301,5 +306,5 @@ def get_local_path(request, url):
parsed = urlparse(url)
path = parsed.path
if request.META.get('SCRIPT_NAME'):
path = path[len(request.META['SCRIPT_NAME']):]
path = path[len(request.META['SCRIPT_NAME']) :]
return path

View File

@ -48,12 +48,17 @@ RETRY_LOGIN_COOKIE = 'MELLON_RETRY_LOGIN'
lasso.setFlag('thin-sessions')
if six.PY3:
def lasso_decode(x):
return x
else:
def lasso_decode(x):
return x.decode('utf-8')
EO_NS = 'https://www.entrouvert.com/'
LOGIN_HINT = '{%s}login-hint' % EO_NS
@ -69,6 +74,7 @@ class HttpResponseBadRequest(django.http.HttpResponseBadRequest):
class LogMixin(object):
"""Initialize a module logger in new objects"""
def __init__(self, *args, **kwargs):
self.log = logging.getLogger(__name__)
super(LogMixin, self).__init__(*args, **kwargs)
@ -173,13 +179,16 @@ class LoginView(ProfileMixin, LogMixin, View):
login.processAuthnResponseMsg(request.POST['SAMLResponse'])
login.acceptSso()
except lasso.ProfileCannotVerifySignatureError:
self.log.warning('SAML authentication failed: signature validation failed for %r',
login.remoteProviderId)
self.log.warning(
'SAML authentication failed: signature validation failed for %r', login.remoteProviderId
)
except lasso.ParamError:
self.log.exception('lasso param error')
except (lasso.LoginStatusNotSuccessError,
lasso.ProfileStatusNotSuccessError,
lasso.ProfileRequestDeniedError):
except (
lasso.LoginStatusNotSuccessError,
lasso.ProfileStatusNotSuccessError,
lasso.ProfileRequestDeniedError,
):
self.show_message_status_is_not_success(login, 'SAML authentication failed')
except lasso.Error as e:
return HttpResponseBadRequest('error processing the authentication response: %r' % e)
@ -195,15 +204,15 @@ class LoginView(ProfileMixin, LogMixin, View):
idp = utils.get_idp(login.remoteProviderId)
if not idp:
self.log.warning('entity id %r is unknown', login.remoteProviderId)
return HttpResponseBadRequest(
'entity id %r is unknown' % login.remoteProviderId)
return HttpResponseBadRequest('entity id %r is unknown' % login.remoteProviderId)
error_url = utils.get_setting(idp, 'ERROR_URL')
error_redirect_after_timeout = utils.get_setting(idp, 'ERROR_REDIRECT_AFTER_TIMEOUT')
if error_url:
error_url = resolve_url(error_url)
next_url = error_url or self.get_next_url(default=resolve_url(settings.LOGIN_REDIRECT_URL))
return self.render(
request, 'mellon/authentication_failed.html',
request,
'mellon/authentication_failed.html',
{
'debug': settings.DEBUG,
'reason': reason,
@ -212,7 +221,8 @@ class LoginView(ProfileMixin, LogMixin, View):
'next_url': next_url,
'relaystate': login.msgRelayState,
'error_redirect_after_timeout': error_redirect_after_timeout,
})
},
)
def get_attribute_value(self, attribute, attribute_value):
# check attribute_value contains only text
@ -236,10 +246,9 @@ class LoginView(ProfileMixin, LogMixin, View):
if login.nameIdentifier:
name_id = login.nameIdentifier
name_id_format = force_text(name_id.format or lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED)
attributes.update({
'name_id_content': lasso_decode(name_id.content),
'name_id_format': name_id_format
})
attributes.update(
{'name_id_content': lasso_decode(name_id.content), 'name_id_format': name_id_format}
)
if name_id.nameQualifier:
attributes['name_id_name_qualifier'] = force_text(name_id.nameQualifier)
if name_id.spNameQualifier:
@ -249,15 +258,15 @@ class LoginView(ProfileMixin, LogMixin, View):
attributes['authn_instant'] = utils.iso8601_to_datetime(authn_statement.authnInstant)
if authn_statement.sessionNotOnOrAfter:
attributes['session_not_on_or_after'] = utils.iso8601_to_datetime(
authn_statement.sessionNotOnOrAfter)
authn_statement.sessionNotOnOrAfter
)
if authn_statement.sessionIndex:
attributes['session_index'] = authn_statement.sessionIndex
attributes['authn_context_class_ref'] = ()
if authn_statement.authnContext:
authn_context = authn_statement.authnContext
if authn_context.authnContextClassRef:
attributes['authn_context_class_ref'] = \
authn_context.authnContextClassRef
attributes['authn_context_class_ref'] = authn_context.authnContextClassRef
self.log.debug('trying to authenticate with attributes %r', attributes)
response = self.authenticate(request, login, attributes)
response.delete_cookie(RETRY_LOGIN_COOKIE)
@ -276,38 +285,39 @@ class LoginView(ProfileMixin, LogMixin, View):
models.SessionIndex.objects.get_or_create(
saml_identifier=user.saml_identifier,
session_key=request.session.session_key,
session_index=session_index)
self.log.info('user %s (NameID is %r) logged in using SAML', user,
attributes['name_id_content'])
session_index=session_index,
)
self.log.info(
'user %s (NameID is %r) logged in using SAML', user, attributes['name_id_content']
)
request.session['mellon_session'] = utils.flatten_datetime(attributes)
if ('session_not_on_or_after' in attributes and not settings.SESSION_EXPIRE_AT_BROWSER_CLOSE):
if 'session_not_on_or_after' in attributes and not settings.SESSION_EXPIRE_AT_BROWSER_CLOSE:
request.session.set_expiry(
utils.get_seconds_expiry(
attributes['session_not_on_or_after']))
utils.get_seconds_expiry(attributes['session_not_on_or_after'])
)
else:
self.log.warning('user %s (NameID is %r) is inactive, login refused', user,
attributes['name_id_content'])
return self.render(request, 'mellon/inactive_user.html', {
'user': user,
'saml_attributes': attributes})
self.log.warning(
'user %s (NameID is %r) is inactive, login refused', user, attributes['name_id_content']
)
return self.render(
request, 'mellon/inactive_user.html', {'user': user, 'saml_attributes': attributes}
)
else:
self.log.warning('no user found for NameID %r', attributes['name_id_content'])
return self.render(
request, 'mellon/user_not_found.html',
{'saml_attributes': attributes})
return self.render(request, 'mellon/user_not_found.html', {'saml_attributes': attributes})
request.session['lasso_session_dump'] = login.session.dump()
return HttpResponseRedirect(next_url)
def retry_login(self):
'''Retry login if it failed for a temporary error.
"""Retry login if it failed for a temporary error.
Use a cookie to prevent looping forever.
'''
Use a cookie to prevent looping forever.
"""
if RETRY_LOGIN_COOKIE in self.request.COOKIES:
response = self.sso_failure(
self.request,
reason=_('There were too many redirections with the identity provider.'))
self.request, reason=_('There were too many redirections with the identity provider.')
)
response.delete_cookie(RETRY_LOGIN_COOKIE)
return response
url = reverse('mellon_login')
@ -341,28 +351,33 @@ class LoginView(ProfileMixin, LogMixin, View):
return HttpResponseBadRequest('artifact is malformed %r' % artifact)
except lasso.ServerProviderNotFoundError:
self.log.warning('no entity id found for artifact %s', artifact)
return HttpResponseBadRequest(
'no entity id found for this artifact %r' % artifact)
return HttpResponseBadRequest('no entity id found for this artifact %r' % artifact)
idp = utils.get_idp(login.remoteProviderId)
if not idp:
return HttpResponseBadRequest(
'entity id %r is unknown' % login.remoteProviderId)
verify_ssl_certificate = utils.get_setting(
idp, 'VERIFY_SSL_CERTIFICATE')
return HttpResponseBadRequest('entity id %r is unknown' % login.remoteProviderId)
verify_ssl_certificate = utils.get_setting(idp, 'VERIFY_SSL_CERTIFICATE')
login.buildRequestMsg()
try:
result = requests.post(login.msgUrl, data=login.msgBody,
headers={'content-type': 'text/xml'},
timeout=app_settings.ARTIFACT_RESOLVE_TIMEOUT,
verify=verify_ssl_certificate)
result = requests.post(
login.msgUrl,
data=login.msgBody,
headers={'content-type': 'text/xml'},
timeout=app_settings.ARTIFACT_RESOLVE_TIMEOUT,
verify=verify_ssl_certificate,
)
except RequestException as e:
self.log.warning('unable to reach %r: %s', login.msgUrl, e)
return self.sso_failure(request,
reason=_('IdP is temporarily down, please try again ' 'later.'),
status_codes=status_codes)
return self.sso_failure(
request,
reason=_('IdP is temporarily down, please try again ' 'later.'),
status_codes=status_codes,
)
if result.status_code != 200:
self.log.warning('SAML authentication failed: IdP returned %s when given artifact: %r',
result.status_code, result.content)
self.log.warning(
'SAML authentication failed: IdP returned %s when given artifact: %r',
result.status_code,
result.content,
)
return self.sso_failure(request, reason=idp_message, status_codes=status_codes)
self.log.info('Got SAML Artifact Response', extra={'saml_response': result.content})
@ -377,18 +392,20 @@ class LoginView(ProfileMixin, LogMixin, View):
except lasso.ProfileInvalidMsgError:
self.log.warning('ArtifactResolveResponse is malformed %r', result.content[:200])
if settings.DEBUG:
return HttpResponseBadRequest('ArtififactResolveResponse is malformed\n%r' %
result.content)
return HttpResponseBadRequest('ArtififactResolveResponse is malformed\n%r' % result.content)
else:
return HttpResponseBadRequest('ArtififactResolveResponse is malformed')
except lasso.ProfileCannotVerifySignatureError:
self.log.warning('SAML authentication failed: signature validation failed for %r',
login.remoteProviderId)
self.log.warning(
'SAML authentication failed: signature validation failed for %r', login.remoteProviderId
)
except lasso.ParamError:
self.log.exception('lasso param error')
except (lasso.LoginStatusNotSuccessError,
lasso.ProfileStatusNotSuccessError,
lasso.ProfileRequestDeniedError):
except (
lasso.LoginStatusNotSuccessError,
lasso.ProfileStatusNotSuccessError,
lasso.ProfileRequestDeniedError,
):
status = login.response.status
a = status
while a.statusCode:
@ -414,8 +431,7 @@ class LoginView(ProfileMixin, LogMixin, View):
url = app_settings.DISCOVERY_SERVICE_URL
params = {
# prevent redirect loops with the discovery service
'entityID': request.build_absolute_uri(
reverse('mellon_metadata')),
'entityID': request.build_absolute_uri(reverse('mellon_metadata')),
'return': return_url,
}
if is_passive:
@ -429,11 +445,12 @@ class LoginView(ProfileMixin, LogMixin, View):
return self.continue_sso_artifact(request, lasso.HTTP_METHOD_ARTIFACT_GET)
# redirect to discovery service if needed
if ('entityID' not in request.GET
and 'nodisco' not in request.GET
and app_settings.DISCOVERY_SERVICE_URL):
return self.request_discovery_service(
request, is_passive=request.GET.get('passive') == '1')
if (
'entityID' not in request.GET
and 'nodisco' not in request.GET
and app_settings.DISCOVERY_SERVICE_URL
):
return self.request_discovery_service(request, is_passive=request.GET.get('passive') == '1')
next_url = check_next_url(self.request, request.GET.get(REDIRECT_FIELD_NAME))
idp = self.get_idp(request)
@ -467,15 +484,21 @@ class LoginView(ProfileMixin, LogMixin, View):
# lasso>2.5.1 introduced a better API
if hasattr(authn_request.extensions, 'any'):
authn_request.extensions.any = (
str('<eo:next_url xmlns:eo="https://www.entrouvert.com/">%s</eo:next_url>' % eo_next_url),
str(
'<eo:next_url xmlns:eo="https://www.entrouvert.com/">%s</eo:next_url>'
% eo_next_url
),
)
else:
authn_request.extensions.setOriginalXmlnode(
str('''<samlp:Extensions
str(
'''<samlp:Extensions
xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
xmlns:eo="https://www.entrouvert.com/">
<eo:next_url>%s</eo:next_url>
</samlp:Extensions>''' % eo_next_url)
</samlp:Extensions>'''
% eo_next_url
)
)
self.set_next_url(next_url)
self.add_login_hints(idp, authn_request, request=request, next_url=next_url or '/')
@ -548,7 +571,9 @@ class LogoutView(ProfileMixin, LogMixin, View):
# there is not current session
not request.user.is_authenticated
# or the current session is not part of the list
or request.session.session_key not in session_keys))
or request.session.session_key not in session_keys
)
)
if asynchronous_logout:
current_session_key = request.session.session_key if request.user.is_authenticated else None
@ -585,23 +610,24 @@ class LogoutView(ProfileMixin, LogMixin, View):
issuer = force_text(logout.remoteProviderId)
session_indexes = set(force_text(sessionIndex) for sessionIndex in logout.request.sessionIndexes)
saml_identifier = models.UserSAMLIdentifier.objects.filter(
name_id=force_text(logout.nameIdentifier.content),
issuer=issuer).select_related('user').first()
saml_identifier = (
models.UserSAMLIdentifier.objects.filter(
name_id=force_text(logout.nameIdentifier.content), issuer=issuer
)
.select_related('user')
.first()
)
if saml_identifier:
name_id_user = saml_identifier.user
indexes = models.SessionIndex.objects.select_related(
'saml_identifier').filter(
saml_identifier=saml_identifier)
indexes = models.SessionIndex.objects.select_related('saml_identifier').filter(
saml_identifier=saml_identifier
)
if session_indexes:
indexes = indexes.filter(session_index__in=session_indexes)
# lasso has too much state :/
logout.setSessionFromDump(
utils.make_session_dump(
logout.nameIdentifier,
indexes))
logout.setSessionFromDump(utils.make_session_dump(logout.nameIdentifier, indexes))
try:
logout.validateRequest()
@ -618,7 +644,8 @@ class LogoutView(ProfileMixin, LogMixin, View):
saml_user=name_id_user,
session_indexes=session_indexes,
indexes=indexes,
mode=mode)
mode=mode,
)
try:
logout.buildResponseMsg()

View File

@ -25,6 +25,7 @@ class compile_translations(Command):
def run(self):
import os
from django.core.management import call_command
os.environ.pop('DJANGO_SETTINGS_MODULE', None)
for path in ['mellon/']:
if path.endswith('.py'):
@ -63,49 +64,52 @@ class install_lib(_install_lib):
def get_version():
'''Use the VERSION, if absent generates a version with git describe, if not
tag exists, take 0.0.0- and add the length of the commit log.
'''
"""Use the VERSION, if absent generates a version with git describe, if not
tag exists, take 0.0.0- and add the length of the commit log.
"""
if os.path.exists('VERSION'):
with open('VERSION', 'r') as v:
return v.read()
if os.path.exists('.git'):
p = subprocess.Popen(['git', 'describe', '--dirty', '--match=v*'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
p = subprocess.Popen(
['git', 'describe', '--dirty', '--match=v*'], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
result = p.communicate()[0]
if p.returncode == 0:
return result.decode('ascii').split()[0][1:].replace('-', '.')
else:
return '0.0.0-%s' % len(subprocess.check_output(
['git', 'rev-list', 'HEAD']).splitlines())
return '0.0.0-%s' % len(subprocess.check_output(['git', 'rev-list', 'HEAD']).splitlines())
return '0.0.0'
setup(name="django-mellon",
version=get_version(),
license="AGPLv3 or later",
description="SAML 2.0 authentication for Django",
long_description=open('README').read(),
url="http://dev.entrouvert.org/projects/django-mellon/",
author="Entr'ouvert",
author_email="info@entrouvert.org",
include_package_data=True,
packages=find_packages(),
install_requires=[
'django>=1.11,<2.3',
'requests',
'isodate',
'atomicwrites',
],
setup_requires=[
'django>=1.10,<2.3',
],
tests_require=[
'nose>=0.11.4',
],
dependency_links=[],
cmdclass={
'build': build,
'install_lib': install_lib,
'compile_translations': compile_translations,
'sdist': eo_sdist,
})
setup(
name="django-mellon",
version=get_version(),
license="AGPLv3 or later",
description="SAML 2.0 authentication for Django",
long_description=open('README').read(),
url="http://dev.entrouvert.org/projects/django-mellon/",
author="Entr'ouvert",
author_email="info@entrouvert.org",
include_package_data=True,
packages=find_packages(),
install_requires=[
'django>=1.11,<2.3',
'requests',
'isodate',
'atomicwrites',
],
setup_requires=[
'django>=1.10,<2.3',
],
tests_require=[
'nose>=0.11.4',
],
dependency_links=[],
cmdclass={
'build': build,
'install_lib': install_lib,
'compile_translations': compile_translations,
'sdist': eo_sdist,
},
)

View File

@ -36,9 +36,9 @@ def app(request, settings):
@pytest.fixture
def concurrency(settings):
'''Select a level of concurrency based on the db, sqlite3 is less robust
thant postgres due to its transaction lock timeout of 5 seconds.
'''
"""Select a level of concurrency based on the db, sqlite3 is less robust
thant postgres due to its transaction lock timeout of 5 seconds.
"""
if 'sqlite' in settings.DATABASES['default']['ENGINE']:
return 20
else:
@ -49,11 +49,13 @@ def concurrency(settings):
def private_settings(request, tmpdir):
import django.conf
from django.conf import UserSettingsHolder
old = django.conf.settings._wrapped
django.conf.settings._wrapped = UserSettingsHolder(old)
def finalizer():
django.conf.settings._wrapped = old
request.addfinalizer(finalizer)
return django.conf.settings
@ -61,6 +63,7 @@ def private_settings(request, tmpdir):
@pytest.fixture
def caplog(caplog):
import py.io
caplog.set_level(logging.INFO)
caplog.handler.stream = py.io.TextIO()
caplog.handler.records = []

View File

@ -115,6 +115,7 @@ def test_lookup_user_transaction(transactional_db, concurrency, idp, saml_attrib
return adapter.lookup_user(idp, saml_attributes)
finally:
connection.close()
users = p.map(f, range(concurrency))
assert len(users) == concurrency
@ -223,7 +224,9 @@ def test_lookup_user_transient_with_email(rf, private_settings, idp, saml_attrib
user = adapter.lookup_user(idp, saml_attributes)
assert User.objects.count() == 0
request._messages.add.assert_called_once_with(30, 'A transient NameID was received but TRANSIENT_FEDERATION_ATTRIBUTE is not set.', '')
request._messages.add.assert_called_once_with(
30, 'A transient NameID was received but TRANSIENT_FEDERATION_ATTRIBUTE is not set.', ''
)
private_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email'
user = adapter.lookup_user(idp, saml_attributes)
@ -378,8 +381,9 @@ def test_load_metadata_url(settings, adapter, metadata, httpserver, freezer, cap
with open(idp['METADATA_PATH']) as fd:
assert fd.read() == metadata
assert idp['METADATA_PATH_LAST_UPDATE'] == now + 1
httpserver.serve_content(content=metadata.replace('idp5', 'idp6'),
headers={'Content-Type': 'application/xml'})
httpserver.serve_content(
content=metadata.replace('idp5', 'idp6'), headers={'Content-Type': 'application/xml'}
)
assert adapter.load_metadata(idp, 0) == metadata
freezer.move_to(datetime.timedelta(seconds=3601))

View File

@ -61,9 +61,11 @@ def sp_public_key():
@fixture
def sp_settings(private_settings, idp_metadata, sp_private_key, sp_public_key):
private_settings.MELLON_IDENTITY_PROVIDERS = [{
'METADATA': idp_metadata,
}]
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': idp_metadata,
}
]
private_settings.MELLON_PUBLIC_KEYS = [sp_public_key]
private_settings.MELLON_PRIVATE_KEYS = [sp_private_key]
private_settings.MELLON_NAME_ID_POLICY_FORMAT = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
@ -106,24 +108,27 @@ class MockIdp(object):
#
# it means Deflate instead of GZIP (same stream no header, no trailer)
self.request = zlib.decompress(
base64.b64decode(
urlparse.parse_qs(
urlparse.urlparse(url).query)['SAMLRequest'][0]), -15)
base64.b64decode(urlparse.parse_qs(urlparse.urlparse(url).query)['SAMLRequest'][0]), -15
)
assert 'rsa-sha256' in url
try:
login.validateRequestMsg(auth_result, consent)
except lasso.LoginRequestDeniedError:
pass
else:
login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD,
datetime.datetime.now().isoformat(),
None,
datetime.datetime.now().isoformat(),
datetime.datetime.now().isoformat())
login.buildAssertion(
lasso.SAML_AUTHENTICATION_METHOD_PASSWORD,
datetime.datetime.now().isoformat(),
None,
datetime.datetime.now().isoformat(),
datetime.datetime.now().isoformat(),
)
def add_attribute(name, *values, **kwargs):
fmt = kwargs.get('fmt', lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC)
statements = login.response.assertion[0].attributeStatement or [lasso.Saml2AttributeStatement()]
statements = login.response.assertion[0].attributeStatement or [
lasso.Saml2AttributeStatement()
]
statement = statements[0]
login.response.assertion[0].attributeStatement = statements
attributes = list(statement.attribute)
@ -148,6 +153,7 @@ class MockIdp(object):
mtn.textChild = True
value_any.append(mtn)
atv.any = value_any
add_attribute('email', 'john', '.doe@gmail.com')
add_attribute('wtf', 'john', lasso.MiscTextNode.newWithXmlNode('<a>coucou</a>'))
add_attribute('first_name', '<i>Fr\xe9d\xe9ric</i>')
@ -213,8 +219,8 @@ class MockIdp(object):
@all_requests
def f(url, request):
content = self.resolve_artifact(request.body)
return mock_response(200, content=content,
headers={'Content-Type': 'application/soap+xml'})
return mock_response(200, content=content, headers={'Content-Type': 'application/soap+xml'})
return f
@ -419,18 +425,23 @@ def test_sso(db, app, idp, caplog, sp_settings):
def test_sso_request_denied(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(
response['Location'],
auth_result=False,
msg='User is not allowed to login')
response['Location'], auth_result=False, msg='User is not allowed to login'
)
assert not relay_state
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
if six.PY3:
assert "status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
assert (
"status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']"
in caplog.text
)
else:
assert "status is not success codes: [u'urn:oasis:names:tc:SAML:2.0:status:Responder',\
u'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
assert (
"status is not success codes: [u'urn:oasis:names:tc:SAML:2.0:status:Responder',\
u'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']"
in caplog.text
)
@pytest.mark.urls('urls_tests_template_base')
@ -438,9 +449,8 @@ def test_template_base(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_metadata'))
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(
response['Location'],
auth_result=False,
msg='User is not allowed to login')
response['Location'], auth_result=False, msg='User is not allowed to login'
)
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'Theme is ok' in response.text
@ -456,9 +466,8 @@ def test_template_hook(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_metadata'))
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(
response['Location'],
auth_result=False,
msg='User is not allowed to login')
response['Location'], auth_result=False, msg='User is not allowed to login'
)
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'Theme is ok' in response.text
assert 'HOOK' in response.text
@ -473,9 +482,8 @@ def test_template_hook(db, app, idp, caplog, sp_settings):
def test_no_template_base(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(
response['Location'],
auth_result=False,
msg='User is not allowed to login')
response['Location'], auth_result=False, msg='User is not allowed to login'
)
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'Theme is ok' not in response.text
@ -487,9 +495,8 @@ def test_sso_request_denied_artifact(db, app, caplog, sp_settings, idp_metadata,
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(
response['Location'],
auth_result=False,
msg='User is not allowed to login')
response['Location'], auth_result=False, msg='User is not allowed to login'
)
assert not relay_state
assert body is None
assert reverse('mellon_login') in url
@ -497,8 +504,11 @@ def test_sso_request_denied_artifact(db, app, caplog, sp_settings, idp_metadata,
acs_artifact_url = url.split('testserver', 1)[1]
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
assert "status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
assert (
"status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']"
in caplog.text
)
assert 'User is not allowed to login' in response
@ -653,8 +663,10 @@ def test_middleware_mixin_first_time(db, app, idp, caplog, settings):
app.set_cookie(str('IDP_SESSION'), str('1'))
response = app.get('/', headers={'Accept': force_str('text/html')}, status=302)
assert urlparse.urlparse(response.location).path == '/login/'
assert (urlparse.parse_qs(urlparse.urlparse(response.location).query, keep_blank_values=True)
== {'next': ['http://testserver/'], 'passive': ['']})
assert urlparse.parse_qs(urlparse.urlparse(response.location).query, keep_blank_values=True) == {
'next': ['http://testserver/'],
'passive': [''],
}
# simulate closing of session at IdP
app.cookiejar.clear('testserver.local', '/', 'IDP_SESSION')
@ -669,8 +681,10 @@ def test_middleware_mixin_first_time(db, app, idp, caplog, settings):
app.set_cookie(str('IDP_SESSION'), str('1'))
response = app.get('/', headers={'Accept': force_str('text/html')}, status=302)
assert urlparse.urlparse(response.location).path == '/login/'
assert (urlparse.parse_qs(urlparse.urlparse(response.location).query, keep_blank_values=True)
== {'next': ['http://testserver/'], 'passive': ['']})
assert urlparse.parse_qs(urlparse.urlparse(response.location).query, keep_blank_values=True) == {
'next': ['http://testserver/'],
'passive': [''],
}
assert 'MELLON_PASSIVE_TRIED' in app.cookies
@ -687,5 +701,7 @@ def test_sso_user_change(db, app, idp, caplog, sp_settings):
assert 'created new user' in caplog.text
caplog.clear()
response = app.post(reverse('mellon_login'), params={'SAMLResponse': other_body, 'RelayState': other_relay_state})
response = app.post(
reverse('mellon_login'), params={'SAMLResponse': other_body, 'RelayState': other_relay_state}
)
assert 'created new user' in caplog.text

View File

@ -39,46 +39,79 @@ def test_create_metadata(rf, private_settings, caplog):
metadata = create_metadata(request)
assert_xml_constraints(
metadata.encode('utf-8'),
('/sm:EntityDescriptor[@entityID="http://testserver/metadata/"]', 1,
('/*', 1),
('/sm:SPSSODescriptor', 1,
('/*', 7),
('/sm:NameIDFormat', 1),
('/sm:SingleLogoutService', 2),
('/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact\']', 1),
('/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
0),
('/sm:AssertionConsumerService[@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
1),
('/sm:KeyDescriptor/ds:KeyInfo/ds:X509Data', 2,
('/ds:X509Certificate', 2),
('/ds:X509Certificate[text()=\'xxx\']', 1),
('/ds:X509Certificate[text()=\'yyy\']', 1)))),
namespaces=ns)
(
'/sm:EntityDescriptor[@entityID="http://testserver/metadata/"]',
1,
('/*', 1),
(
'/sm:SPSSODescriptor',
1,
('/*', 7),
('/sm:NameIDFormat', 1),
('/sm:SingleLogoutService', 2),
(
'/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact\']',
1,
),
(
'/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
0,
),
(
'/sm:AssertionConsumerService[@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
1,
),
(
'/sm:KeyDescriptor/ds:KeyInfo/ds:X509Data',
2,
('/ds:X509Certificate', 2),
('/ds:X509Certificate[text()=\'xxx\']', 1),
('/ds:X509Certificate[text()=\'yyy\']', 1),
),
),
),
namespaces=ns,
)
private_settings.MELLON_METADATA_PUBLISH_DISCOVERY_RESPONSE = True
with mock.patch('mellon.utils.open', mock.mock_open(read_data='BEGIN\nyyy\nEND'), create=True):
metadata = create_metadata(request)
assert_xml_constraints(
metadata.encode('utf-8'),
('/sm:EntityDescriptor[@entityID="http://testserver/metadata/"]', 1,
('/*', 1),
('/sm:SPSSODescriptor', 1,
('/*', 8),
('/sm:Extensions', 1,
('/idpdisc:DiscoveryResponse', 1)),
('/sm:NameIDFormat', 1),
('/sm:SingleLogoutService', 2),
('/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact\']', 1),
('/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
0),
('/sm:AssertionConsumerService[@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
1),
('/sm:KeyDescriptor/ds:KeyInfo/ds:X509Data', 2,
('/ds:X509Certificate', 2),
('/ds:X509Certificate[text()=\'xxx\']', 1),
('/ds:X509Certificate[text()=\'yyy\']', 1)))),
namespaces=ns)
(
'/sm:EntityDescriptor[@entityID="http://testserver/metadata/"]',
1,
('/*', 1),
(
'/sm:SPSSODescriptor',
1,
('/*', 8),
('/sm:Extensions', 1, ('/idpdisc:DiscoveryResponse', 1)),
('/sm:NameIDFormat', 1),
('/sm:SingleLogoutService', 2),
(
'/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact\']',
1,
),
(
'/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
0,
),
(
'/sm:AssertionConsumerService[@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
1,
),
(
'/sm:KeyDescriptor/ds:KeyInfo/ds:X509Data',
2,
('/ds:X509Certificate', 2),
('/ds:X509Certificate[text()=\'xxx\']', 1),
('/ds:X509Certificate[text()=\'yyy\']', 1),
),
),
),
namespaces=ns,
)
def test_iso8601_to_datetime(private_settings):
@ -91,18 +124,18 @@ def test_iso8601_to_datetime(private_settings):
django.utils.timezone._localtime = None
private_settings.USE_TZ = False
# UTC ISO8601 -> naive datetime UTC
assert iso8601_to_datetime('2010-10-01T10:10:34Z') == datetime.datetime(
2010, 10, 1, 10, 10, 34)
assert iso8601_to_datetime('2010-10-01T10:10:34Z') == datetime.datetime(2010, 10, 1, 10, 10, 34)
# NAIVE ISO8601 -> naive datetime UTC
assert iso8601_to_datetime('2010-10-01T10:10:34') == datetime.datetime(
2010, 10, 1, 10, 10, 34)
assert iso8601_to_datetime('2010-10-01T10:10:34') == datetime.datetime(2010, 10, 1, 10, 10, 34)
private_settings.USE_TZ = True
# UTC+1h ISO8601 -> Aware datetime UTC
assert iso8601_to_datetime('2010-10-01T10:10:34+01:00') == datetime.datetime(
2010, 10, 1, 9, 10, 34, tzinfo=pytz.utc)
2010, 10, 1, 9, 10, 34, tzinfo=pytz.utc
)
# Naive ISO8601 -> Aware datetime UTC
assert iso8601_to_datetime('2010-10-01T10:10:34') == datetime.datetime(
2010, 10, 1, 10, 10, 34, tzinfo=pytz.utc)
2010, 10, 1, 10, 10, 34, tzinfo=pytz.utc
)
def test_flatten_datetime():

View File

@ -52,21 +52,21 @@ def test_metadata(private_settings, client):
{
'LABEL': 'FoobarEnglish',
'LANG': 'en',
}
},
],
'DISPLAY_NAMES': [
'Foobar',
{
'LABEL': 'FoobarEnglish',
'LANG': 'en',
}
},
],
'URLS': [
'http://foobar.com/',
{
'URL': 'http://foobar.com/en/',
'LANG': 'en',
}
},
],
}
private_settings.MELLON_CONTACT_PERSONS = [
@ -104,38 +104,57 @@ def test_metadata(private_settings, client):
response = client.get('/metadata/')
assert_xml_constraints(
response.content,
('/sm:EntityDescriptor[@entityID="http://testserver/metadata/"]', 1,
('/*', 4),
('/sm:SPSSODescriptor', 1,
('/*', 7),
('/sm:NameIDFormat', 1),
('/sm:SingleLogoutService', 2),
('/sm:AssertionConsumerService', None,
('[@isDefault="true"]', None,
('[@Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact"]', 1),
('[@Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"]', 0)),
('[@Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"]', 1)),
('/sm:KeyDescriptor/ds:KeyInfo/ds:X509Data', 2,
('/ds:X509Certificate', 2),
('/ds:X509Certificate[text()="xxx"]', 1),
('/ds:X509Certificate[text()="yyy"]', 1))),
('/sm:Organization', 1,
('/sm:OrganizationName', 2),
('/sm:OrganizationName[text()="Foobar"]', 1),
('/sm:OrganizationName[text()="FoobarEnglish"]', 1,
('[@xml:lang="en"]', 1)),
('/sm:OrganizationDisplayName', 2),
('/sm:OrganizationDisplayName[text()="Foobar"]', 1),
('/sm:OrganizationDisplayName[text()="FoobarEnglish"]', 1,
('[@xml:lang="en"]', 1)),
('/sm:OrganizationURL', 2),
('/sm:OrganizationURL[text()="http://foobar.com/"]', 1),
('/sm:OrganizationURL[text()="http://foobar.com/en/"]', 1,
('[@xml:lang="en"]', 1))),
('/sm:ContactPerson', 2,
('[@contactType="technical"]', 1),
('[@contactType="administrative"]', 1))),
namespaces=ns)
(
'/sm:EntityDescriptor[@entityID="http://testserver/metadata/"]',
1,
('/*', 4),
(
'/sm:SPSSODescriptor',
1,
('/*', 7),
('/sm:NameIDFormat', 1),
('/sm:SingleLogoutService', 2),
(
'/sm:AssertionConsumerService',
None,
(
'[@isDefault="true"]',
None,
('[@Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact"]', 1),
('[@Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"]', 0),
),
('[@Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"]', 1),
),
(
'/sm:KeyDescriptor/ds:KeyInfo/ds:X509Data',
2,
('/ds:X509Certificate', 2),
('/ds:X509Certificate[text()="xxx"]', 1),
('/ds:X509Certificate[text()="yyy"]', 1),
),
),
(
'/sm:Organization',
1,
('/sm:OrganizationName', 2),
('/sm:OrganizationName[text()="Foobar"]', 1),
('/sm:OrganizationName[text()="FoobarEnglish"]', 1, ('[@xml:lang="en"]', 1)),
('/sm:OrganizationDisplayName', 2),
('/sm:OrganizationDisplayName[text()="Foobar"]', 1),
('/sm:OrganizationDisplayName[text()="FoobarEnglish"]', 1, ('[@xml:lang="en"]', 1)),
('/sm:OrganizationURL', 2),
('/sm:OrganizationURL[text()="http://foobar.com/"]', 1),
('/sm:OrganizationURL[text()="http://foobar.com/en/"]', 1, ('[@xml:lang="en"]', 1)),
),
(
'/sm:ContactPerson',
2,
('[@contactType="technical"]', 1),
('[@contactType="administrative"]', 1),
),
),
namespaces=ns,
)
def test_sp_initiated_login_improperly_configured2(private_settings, client):
@ -151,8 +170,10 @@ def test_sp_initiated_login_discovery_service(private_settings, client):
assert response.status_code == 302
params = parse_qs(urlparse(response['Location']).query)
assert response['Location'].startswith('https://disco?')
assert params == {'return': ['http://testserver/login/?nodisco=1'],
'entityID': ['http://testserver/metadata/']}
assert params == {
'return': ['http://testserver/login/?nodisco=1'],
'entityID': ['http://testserver/metadata/'],
}
def test_sp_initiated_login_discovery_service_passive(private_settings, client):
@ -161,9 +182,11 @@ def test_sp_initiated_login_discovery_service_passive(private_settings, client):
assert response.status_code == 302
params = parse_qs(urlparse(response['Location']).query)
assert response['Location'].startswith('https://disco?')
assert params == {'isPassive': ['true'],
'entityID': ['http://testserver/metadata/'],
'return': ['http://testserver/login/?passive=1&nodisco=1']}
assert params == {
'isPassive': ['true'],
'entityID': ['http://testserver/metadata/'],
'return': ['http://testserver/login/?passive=1&nodisco=1'],
}
def test_sp_initiated_login_discovery_service_nodisco(private_settings, client):
@ -175,9 +198,11 @@ def test_sp_initiated_login_discovery_service_nodisco(private_settings, client):
def test_sp_initiated_login(private_settings, client):
private_settings.MELLON_IDENTITY_PROVIDERS = [{
'METADATA': open('tests/metadata.xml').read(),
}]
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': open('tests/metadata.xml').read(),
}
]
response = client.get('/login/?next=%2Fwhatever')
assert response.status_code == 302
params = parse_qs(urlparse(response['Location']).query)
@ -189,13 +214,17 @@ def test_sp_initiated_login(private_settings, client):
def test_sp_initiated_login_chosen(private_settings, client):
private_settings.MELLON_IDENTITY_PROVIDERS = [{
'METADATA': open('tests/metadata.xml').read(),
}]
qs = urlencode({
'entityID': 'http://idp5/metadata',
'next': '/whatever',
})
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': open('tests/metadata.xml').read(),
}
]
qs = urlencode(
{
'entityID': 'http://idp5/metadata',
'next': '/whatever',
}
)
response = client.get('/login/?' + qs)
assert response.status_code == 302
params = parse_qs(urlparse(response['Location']).query)
@ -207,11 +236,12 @@ def test_sp_initiated_login_chosen(private_settings, client):
def test_sp_initiated_login_requested_authn_context(private_settings, client):
private_settings.MELLON_IDENTITY_PROVIDERS = [{
'METADATA': open('tests/metadata.xml').read(),
'AUTHN_CLASSREF': ['urn:be:fedict:iam:fas:citizen:eid',
'urn:be:fedict:iam:fas:citizen:token'],
}]
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': open('tests/metadata.xml').read(),
'AUTHN_CLASSREF': ['urn:be:fedict:iam:fas:citizen:eid', 'urn:be:fedict:iam:fas:citizen:token'],
}
]
response = client.get('/login/')
assert response.status_code == 302
params = parse_qs(urlparse(response['Location']).query)
@ -222,13 +252,17 @@ def test_sp_initiated_login_requested_authn_context(private_settings, client):
request = lasso.Samlp2AuthnRequest()
assert request.initFromQuery(urlparse(response['Location']).query)
assert request.requestedAuthnContext.authnContextClassRef == (
'urn:be:fedict:iam:fas:citizen:eid', 'urn:be:fedict:iam:fas:citizen:token')
'urn:be:fedict:iam:fas:citizen:eid',
'urn:be:fedict:iam:fas:citizen:token',
)
def test_malfortmed_artifact(private_settings, client, caplog):
private_settings.MELLON_IDENTITY_PROVIDERS = [{
'METADATA': open('tests/metadata.xml').read(),
}]
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': open('tests/metadata.xml').read(),
}
]
response = client.get('/login/?SAMLart=xxx', status=400)
assert response['Content-Type'] == 'text/plain'
assert response['X-Content-Type-Options'] == 'nosniff'
@ -244,18 +278,22 @@ def artifact():
def test_error_500_on_artifact_resolve(private_settings, client, caplog, artifact):
private_settings.MELLON_IDENTITY_PROVIDERS = [{
'METADATA': open('tests/metadata.xml').read(),
}]
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': open('tests/metadata.xml').read(),
}
]
with HTTMock(error_500):
client.get('/login/?SAMLart=%s' % artifact)
assert 'IdP returned 500' in caplog.text
def test_invalid_msg_on_artifact_resolve(private_settings, client, caplog, artifact):
private_settings.MELLON_IDENTITY_PROVIDERS = [{
'METADATA': open('tests/metadata.xml').read(),
}]
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': open('tests/metadata.xml').read(),
}
]
with HTTMock(html_response):
client.get('/login/?SAMLart=%s' % artifact)
assert 'ArtifactResolveResponse is malformed' in caplog.text

View File

@ -20,6 +20,7 @@ from django.http import HttpResponse
def homepage(request):
return HttpResponse('ok')
urlpatterns = [
url('^', include('mellon.urls')),
url('^$', homepage, name='homepage'),

View File

@ -20,6 +20,7 @@ from django.http import HttpResponse
def homepage(request):
return HttpResponse('ok')
urlpatterns = [
url('^', include('mellon.urls'), kwargs={'template_base': 'theme.html'}),
url('^$', homepage, name='homepage'),

View File

@ -20,14 +20,19 @@ from django.http import HttpResponse
def homepage(request):
return HttpResponse('ok')
def context_hook(context):
context['hook'] = 'HOOK'
urlpatterns = [
url('^', include('mellon.urls'), kwargs={
'template_base': 'theme.html',
'context_hook': context_hook,
}),
url(
'^',
include('mellon.urls'),
kwargs={
'template_base': 'theme.html',
'context_hook': context_hook,
},
),
url('^$', homepage, name='homepage'),
]

View File

@ -18,6 +18,7 @@ from lxml import etree as ET
def assert_xml_constraints(content, constraint, namespaces={}):
d = ET.fromstring(content)
def check(constraint, prefix=''):
path, count = constraint[:2]
path = prefix + path
@ -27,4 +28,5 @@ def assert_xml_constraints(content, constraint, namespaces={}):
if len(constraint) > 2:
for constraint in constraint[2:]:
check(constraint, prefix=path)
check(constraint)

View File

@ -14,8 +14,7 @@ DATABASES = {
DEBUG = True
SECRET_KEY = 'xx'
STATIC_URL = '/static/'
INSTALLED_APPS = ('mellon', 'django.contrib.auth',
'django.contrib.contenttypes', 'django.contrib.sessions')
INSTALLED_APPS = ('mellon', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions')
if hasattr(global_settings, 'MIDDLEWARE_CLASSES'):
MIDDLEWARE_CLASSES = global_settings.MIDDLEWARE_CLASSES
MIDDLEWARE_CLASSES += (
@ -31,9 +30,7 @@ else:
'mellon.middleware.PassiveAuthenticationMiddleware',
)
AUTHENTICATION_BACKENDS = (
'mellon.backends.SAMLBackend',
)
AUTHENTICATION_BACKENDS = ('mellon.backends.SAMLBackend',)
ROOT_URLCONF = 'urls_tests'
TEMPLATE_DIRS = [
'tests/templates/',